• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::cmp::min;
15 use std::ops::Deref;
16 use std::pin::Pin;
17 use std::sync::atomic::Ordering;
18 use std::task::{Context, Poll};
19 
20 use ylong_http::error::HttpError;
21 use ylong_http::h3::{
22     Frame, H3Error, H3ErrorCode, Headers, Parts, Payload, PseudoHeaders, HEADERS_FRAME_TYPE,
23 };
24 use ylong_http::request::uri::Scheme;
25 use ylong_http::request::RequestPart;
26 use ylong_http::response::status::StatusCode;
27 use ylong_http::response::ResponsePart;
28 use ylong_runtime::io::ReadBuf;
29 
30 use crate::async_impl::conn::StreamData;
31 use crate::async_impl::request::Message;
32 use crate::async_impl::{HttpBody, Response};
33 use crate::runtime::AsyncRead;
34 use crate::util::config::HttpVersion;
35 use crate::util::data_ref::BodyDataRef;
36 use crate::util::dispatcher::http3::{DispatchErrorKind, Http3Conn, RequestWrapper, RespMessage};
37 use crate::util::normalizer::BodyLengthParser;
38 use crate::ErrorKind::BodyTransfer;
39 use crate::{ErrorKind, HttpClientError};
40 
request<S>( mut conn: Http3Conn<S>, mut message: Message, ) -> Result<Response, HttpClientError> where S: Sync + Send + Unpin + 'static,41 pub(crate) async fn request<S>(
42     mut conn: Http3Conn<S>,
43     mut message: Message,
44 ) -> Result<Response, HttpClientError>
45 where
46     S: Sync + Send + Unpin + 'static,
47 {
48     message
49         .interceptor
50         .intercept_request(message.request.ref_mut())?;
51     let part = message.request.ref_mut().part().clone();
52 
53     // TODO Implement trailer.
54     let headers = build_headers_frame(part)
55         .map_err(|e| HttpClientError::from_error(ErrorKind::Request, e))?;
56     let data = BodyDataRef::new(message.request.clone(), conn.speed_controller.clone());
57     let stream = RequestWrapper {
58         header: headers,
59         data,
60     };
61     conn.send_frame_to_reader(stream)?;
62     let frame = conn.recv_resp().await?;
63     frame_2_response(conn, frame, message)
64 }
65 
build_headers_frame(mut part: RequestPart) -> Result<Frame, HttpError>66 pub(crate) fn build_headers_frame(mut part: RequestPart) -> Result<Frame, HttpError> {
67     // todo: check rfc to see if any headers should be removed
68     let pseudo = build_pseudo_headers(&mut part)?;
69     let mut header_part = Parts::new();
70     header_part.set_header_lines(part.headers);
71     header_part.set_pseudo(pseudo);
72     let headers_payload = Headers::new(header_part);
73 
74     Ok(Frame::new(
75         HEADERS_FRAME_TYPE,
76         Payload::Headers(headers_payload),
77     ))
78 }
79 
80 // todo: error if headers not enough, should meet rfc
build_pseudo_headers(request_part: &mut RequestPart) -> Result<PseudoHeaders, HttpError>81 fn build_pseudo_headers(request_part: &mut RequestPart) -> Result<PseudoHeaders, HttpError> {
82     let mut pseudo = PseudoHeaders::default();
83     match request_part.uri.scheme() {
84         Some(scheme) => {
85             pseudo.set_scheme(Some(String::from(scheme.as_str())));
86         }
87         None => pseudo.set_scheme(Some(String::from(Scheme::HTTPS.as_str()))),
88     }
89     pseudo.set_method(Some(String::from(request_part.method.as_str())));
90     pseudo.set_path(
91         request_part
92             .uri
93             .path_and_query()
94             .or_else(|| Some(String::from("/"))),
95     );
96     let host = request_part
97         .headers
98         .remove("host")
99         .and_then(|auth| auth.to_string().ok());
100     pseudo.set_authority(host);
101     Ok(pseudo)
102 }
103 
frame_2_response<S>( conn: Http3Conn<S>, headers_frame: Frame, mut message: Message, ) -> Result<Response, HttpClientError> where S: Sync + Send + Unpin + 'static,104 fn frame_2_response<S>(
105     conn: Http3Conn<S>,
106     headers_frame: Frame,
107     mut message: Message,
108 ) -> Result<Response, HttpClientError>
109 where
110     S: Sync + Send + Unpin + 'static,
111 {
112     let part = match headers_frame.payload() {
113         Payload::Headers(headers) => {
114             let part = headers.get_part();
115             let (pseudo, fields) = part.parts();
116             let status_code = match pseudo.status() {
117                 Some(status) => StatusCode::from_bytes(status.as_bytes())
118                     .map_err(|e| HttpClientError::from_error(ErrorKind::Request, e))?,
119                 None => {
120                     return Err(HttpClientError::from_str(
121                         ErrorKind::Request,
122                         "status code not found",
123                     ));
124                 }
125             };
126             ResponsePart {
127                 version: ylong_http::version::Version::HTTP3,
128                 status: status_code,
129                 headers: fields.clone(),
130             }
131         }
132         Payload::PushPromise(_) => {
133             todo!();
134         }
135         _ => {
136             return Err(HttpClientError::from_str(ErrorKind::Request, "bad frame"));
137         }
138     };
139 
140     let data_io = TextIo::new(conn);
141     let length = match BodyLengthParser::new(message.request.ref_mut().method(), &part).parse() {
142         Ok(length) => length,
143         Err(e) => {
144             return Err(e);
145         }
146     };
147     let body = HttpBody::new(message.interceptor, length, Box::new(data_io), &[0u8; 0])?;
148 
149     Ok(Response::new(
150         ylong_http::response::Response::from_raw_parts(part, body),
151     ))
152 }
153 
154 struct TextIo<S> {
155     pub(crate) handle: Http3Conn<S>,
156     pub(crate) offset: usize,
157     pub(crate) remain: Option<Frame>,
158     pub(crate) is_closed: bool,
159 }
160 
161 struct HttpReadBuf<'a, 'b> {
162     buf: &'a mut ReadBuf<'b>,
163 }
164 
165 impl<'a, 'b> HttpReadBuf<'a, 'b> {
append_slice(&mut self, buf: &[u8])166     pub(crate) fn append_slice(&mut self, buf: &[u8]) {
167         #[cfg(feature = "ylong_base")]
168         self.buf.append(buf);
169 
170         #[cfg(feature = "tokio_base")]
171         self.buf.put_slice(buf);
172     }
173 }
174 
175 impl<'a, 'b> Deref for HttpReadBuf<'a, 'b> {
176     type Target = ReadBuf<'b>;
177 
deref(&self) -> &Self::Target178     fn deref(&self) -> &Self::Target {
179         self.buf
180     }
181 }
182 
183 impl<S> TextIo<S>
184 where
185     S: Sync + Send + Unpin + 'static,
186 {
new(handle: Http3Conn<S>) -> Self187     pub(crate) fn new(handle: Http3Conn<S>) -> Self {
188         Self {
189             handle,
190             offset: 0,
191             remain: None,
192             is_closed: false,
193         }
194     }
195 
match_channel_message( poll_result: Poll<RespMessage>, text_io: &mut TextIo<S>, buf: &mut HttpReadBuf, ) -> Option<Poll<std::io::Result<()>>>196     fn match_channel_message(
197         poll_result: Poll<RespMessage>,
198         text_io: &mut TextIo<S>,
199         buf: &mut HttpReadBuf,
200     ) -> Option<Poll<std::io::Result<()>>> {
201         match poll_result {
202             Poll::Ready(RespMessage::Output(frame)) => match frame.payload() {
203                 Payload::Headers(_) => {
204                     text_io.remain = Some(frame);
205                     text_io.offset = 0;
206                     Some(Poll::Ready(Ok(())))
207                 }
208                 Payload::Data(data) => {
209                     let data = data.data();
210                     let unfilled_len = buf.remaining();
211                     let data_len = data.len();
212                     let fill_len = min(data_len, unfilled_len);
213                     if unfilled_len < data_len {
214                         buf.append_slice(&data[..fill_len]);
215                         text_io.offset += fill_len;
216                         text_io.remain = Some(frame);
217                         Some(Poll::Ready(Ok(())))
218                     } else {
219                         buf.append_slice(&data[..fill_len]);
220                         Self::end_read(text_io, data_len)
221                     }
222                 }
223                 _ => Some(Poll::Ready(Err(std::io::Error::new(
224                     std::io::ErrorKind::Other,
225                     HttpError::from(H3Error::Connection(H3ErrorCode::H3InternalError)),
226                 )))),
227             },
228             Poll::Ready(RespMessage::OutputExit(e)) => match e {
229                 DispatchErrorKind::H3(H3Error::Connection(H3ErrorCode::H3NoError))
230                 | DispatchErrorKind::StreamFinished => {
231                     text_io.is_closed = true;
232                     Some(Poll::Ready(Ok(())))
233                 }
234                 _ => Some(Poll::Ready(Err(std::io::Error::new(
235                     std::io::ErrorKind::Other,
236                     HttpError::from(H3Error::Connection(H3ErrorCode::H3InternalError)),
237                 )))),
238             },
239             Poll::Pending => Some(Poll::Pending),
240         }
241     }
242 
end_read(text_io: &mut TextIo<S>, data_len: usize) -> Option<Poll<std::io::Result<()>>>243     fn end_read(text_io: &mut TextIo<S>, data_len: usize) -> Option<Poll<std::io::Result<()>>> {
244         text_io.offset = 0;
245         text_io.remain = None;
246         if data_len == 0 {
247             // no data read and is not end stream.
248             None
249         } else {
250             Some(Poll::Ready(Ok(())))
251         }
252     }
253 
read_remaining_data( text_io: &mut TextIo<S>, buf: &mut HttpReadBuf, ) -> Option<Poll<std::io::Result<()>>>254     fn read_remaining_data(
255         text_io: &mut TextIo<S>,
256         buf: &mut HttpReadBuf,
257     ) -> Option<Poll<std::io::Result<()>>> {
258         if let Some(frame) = &text_io.remain {
259             return match frame.payload() {
260                 Payload::Headers(_) => Some(Poll::Ready(Ok(()))),
261                 Payload::Data(data) => {
262                     let data = data.data();
263                     let unfilled_len = buf.remaining();
264                     let data_len = data.len() - text_io.offset;
265                     let fill_len = min(unfilled_len, data_len);
266                     // The peripheral function already ensures that the remaing of buf will not be
267                     // 0.
268                     if unfilled_len < data_len {
269                         buf.append_slice(&data[text_io.offset..text_io.offset + fill_len]);
270                         text_io.offset += fill_len;
271                         Some(Poll::Ready(Ok(())))
272                     } else {
273                         buf.append_slice(&data[text_io.offset..text_io.offset + fill_len]);
274                         Self::end_read(text_io, data_len)
275                     }
276                 }
277                 _ => Some(Poll::Ready(Err(std::io::Error::new(
278                     std::io::ErrorKind::Other,
279                     HttpError::from(H3Error::Connection(H3ErrorCode::H3InternalError)),
280                 )))),
281             };
282         }
283         None
284     }
285 }
286 
287 impl<S: Sync + Send + Unpin + 'static> StreamData for TextIo<S> {
shutdown(&self)288     fn shutdown(&self) {
289         self.handle.io_shutdown.store(true, Ordering::Relaxed);
290     }
291 
is_stream_closable(&self) -> bool292     fn is_stream_closable(&self) -> bool {
293         self.is_closed
294     }
295 
http_version(&self) -> HttpVersion296     fn http_version(&self) -> HttpVersion {
297         HttpVersion::Http3
298     }
299 }
300 
301 impl<S: Sync + Send + Unpin + 'static> AsyncRead for TextIo<S> {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>302     fn poll_read(
303         self: Pin<&mut Self>,
304         cx: &mut Context<'_>,
305         buf: &mut ReadBuf<'_>,
306     ) -> Poll<std::io::Result<()>> {
307         let mut buf = HttpReadBuf { buf };
308         let text_io = self.get_mut();
309         if buf.remaining() == 0 || text_io.is_closed {
310             return Poll::Ready(Ok(()));
311         }
312         if text_io
313             .handle
314             .speed_controller
315             .poll_recv_pending_timeout(cx)
316         {
317             return Poll::Ready(Err(std::io::Error::new(
318                 std::io::ErrorKind::TimedOut,
319                 HttpClientError::from_str(BodyTransfer, "Below low speed limit"),
320             )));
321         }
322         text_io.handle.speed_controller.init_min_recv_if_not_start();
323         if text_io
324             .handle
325             .speed_controller
326             .poll_max_recv_delay_time(cx)
327             .is_pending()
328         {
329             return Poll::Pending;
330         }
331         text_io.handle.speed_controller.init_max_recv_if_not_start();
332         while buf.remaining() != 0 {
333             if let Some(result) = Self::read_remaining_data(text_io, &mut buf) {
334                 return match result {
335                     Poll::Ready(Ok(_)) => {
336                         let filled: usize = buf.filled().len();
337                         text_io
338                             .handle
339                             .speed_controller
340                             .min_recv_speed_limit(filled)
341                             .map_err(|_e| std::io::Error::from(std::io::ErrorKind::Other))?;
342                         text_io
343                             .handle
344                             .speed_controller
345                             .delay_max_recv_speed_limit(filled);
346                         text_io.handle.speed_controller.reset_recv_pending_timeout();
347                         Poll::Ready(Ok(()))
348                     }
349                     Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
350                     Poll::Pending => Poll::Pending,
351                 };
352             }
353 
354             let poll_result = text_io
355                 .handle
356                 .resp_receiver
357                 .poll_recv(cx)
358                 .map_err(|_e| std::io::Error::from(std::io::ErrorKind::ConnectionAborted))?;
359 
360             if let Some(result) = Self::match_channel_message(poll_result, text_io, &mut buf) {
361                 return match result {
362                     Poll::Ready(Ok(_)) => {
363                         let filled: usize = buf.filled().len();
364                         text_io
365                             .handle
366                             .speed_controller
367                             .min_recv_speed_limit(filled)
368                             .map_err(|_e| std::io::Error::from(std::io::ErrorKind::Other))?;
369                         text_io
370                             .handle
371                             .speed_controller
372                             .delay_max_recv_speed_limit(filled);
373                         text_io.handle.speed_controller.reset_recv_pending_timeout();
374                         Poll::Ready(Ok(()))
375                     }
376                     Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
377                     Poll::Pending => Poll::Pending,
378                 };
379             }
380         }
381         Poll::Ready(Ok(()))
382     }
383 }
384