1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::cmp::min;
15 use std::ops::Deref;
16 use std::pin::Pin;
17 use std::sync::atomic::Ordering;
18 use std::task::{Context, Poll};
19
20 use ylong_http::error::HttpError;
21 use ylong_http::h3::{
22 Frame, H3Error, H3ErrorCode, Headers, Parts, Payload, PseudoHeaders, HEADERS_FRAME_TYPE,
23 };
24 use ylong_http::request::uri::Scheme;
25 use ylong_http::request::RequestPart;
26 use ylong_http::response::status::StatusCode;
27 use ylong_http::response::ResponsePart;
28 use ylong_runtime::io::ReadBuf;
29
30 use crate::async_impl::conn::StreamData;
31 use crate::async_impl::request::Message;
32 use crate::async_impl::{HttpBody, Response};
33 use crate::runtime::AsyncRead;
34 use crate::util::config::HttpVersion;
35 use crate::util::data_ref::BodyDataRef;
36 use crate::util::dispatcher::http3::{DispatchErrorKind, Http3Conn, RequestWrapper, RespMessage};
37 use crate::util::normalizer::BodyLengthParser;
38 use crate::ErrorKind::BodyTransfer;
39 use crate::{ErrorKind, HttpClientError};
40
request<S>( mut conn: Http3Conn<S>, mut message: Message, ) -> Result<Response, HttpClientError> where S: Sync + Send + Unpin + 'static,41 pub(crate) async fn request<S>(
42 mut conn: Http3Conn<S>,
43 mut message: Message,
44 ) -> Result<Response, HttpClientError>
45 where
46 S: Sync + Send + Unpin + 'static,
47 {
48 message
49 .interceptor
50 .intercept_request(message.request.ref_mut())?;
51 let part = message.request.ref_mut().part().clone();
52
53 // TODO Implement trailer.
54 let headers = build_headers_frame(part)
55 .map_err(|e| HttpClientError::from_error(ErrorKind::Request, e))?;
56 let data = BodyDataRef::new(message.request.clone(), conn.speed_controller.clone());
57 let stream = RequestWrapper {
58 header: headers,
59 data,
60 };
61 conn.send_frame_to_reader(stream)?;
62 let frame = conn.recv_resp().await?;
63 frame_2_response(conn, frame, message)
64 }
65
build_headers_frame(mut part: RequestPart) -> Result<Frame, HttpError>66 pub(crate) fn build_headers_frame(mut part: RequestPart) -> Result<Frame, HttpError> {
67 // todo: check rfc to see if any headers should be removed
68 let pseudo = build_pseudo_headers(&mut part)?;
69 let mut header_part = Parts::new();
70 header_part.set_header_lines(part.headers);
71 header_part.set_pseudo(pseudo);
72 let headers_payload = Headers::new(header_part);
73
74 Ok(Frame::new(
75 HEADERS_FRAME_TYPE,
76 Payload::Headers(headers_payload),
77 ))
78 }
79
80 // todo: error if headers not enough, should meet rfc
build_pseudo_headers(request_part: &mut RequestPart) -> Result<PseudoHeaders, HttpError>81 fn build_pseudo_headers(request_part: &mut RequestPart) -> Result<PseudoHeaders, HttpError> {
82 let mut pseudo = PseudoHeaders::default();
83 match request_part.uri.scheme() {
84 Some(scheme) => {
85 pseudo.set_scheme(Some(String::from(scheme.as_str())));
86 }
87 None => pseudo.set_scheme(Some(String::from(Scheme::HTTPS.as_str()))),
88 }
89 pseudo.set_method(Some(String::from(request_part.method.as_str())));
90 pseudo.set_path(
91 request_part
92 .uri
93 .path_and_query()
94 .or_else(|| Some(String::from("/"))),
95 );
96 let host = request_part
97 .headers
98 .remove("host")
99 .and_then(|auth| auth.to_string().ok());
100 pseudo.set_authority(host);
101 Ok(pseudo)
102 }
103
frame_2_response<S>( conn: Http3Conn<S>, headers_frame: Frame, mut message: Message, ) -> Result<Response, HttpClientError> where S: Sync + Send + Unpin + 'static,104 fn frame_2_response<S>(
105 conn: Http3Conn<S>,
106 headers_frame: Frame,
107 mut message: Message,
108 ) -> Result<Response, HttpClientError>
109 where
110 S: Sync + Send + Unpin + 'static,
111 {
112 let part = match headers_frame.payload() {
113 Payload::Headers(headers) => {
114 let part = headers.get_part();
115 let (pseudo, fields) = part.parts();
116 let status_code = match pseudo.status() {
117 Some(status) => StatusCode::from_bytes(status.as_bytes())
118 .map_err(|e| HttpClientError::from_error(ErrorKind::Request, e))?,
119 None => {
120 return Err(HttpClientError::from_str(
121 ErrorKind::Request,
122 "status code not found",
123 ));
124 }
125 };
126 ResponsePart {
127 version: ylong_http::version::Version::HTTP3,
128 status: status_code,
129 headers: fields.clone(),
130 }
131 }
132 Payload::PushPromise(_) => {
133 todo!();
134 }
135 _ => {
136 return Err(HttpClientError::from_str(ErrorKind::Request, "bad frame"));
137 }
138 };
139
140 let data_io = TextIo::new(conn);
141 let length = match BodyLengthParser::new(message.request.ref_mut().method(), &part).parse() {
142 Ok(length) => length,
143 Err(e) => {
144 return Err(e);
145 }
146 };
147 let body = HttpBody::new(message.interceptor, length, Box::new(data_io), &[0u8; 0])?;
148
149 Ok(Response::new(
150 ylong_http::response::Response::from_raw_parts(part, body),
151 ))
152 }
153
154 struct TextIo<S> {
155 pub(crate) handle: Http3Conn<S>,
156 pub(crate) offset: usize,
157 pub(crate) remain: Option<Frame>,
158 pub(crate) is_closed: bool,
159 }
160
161 struct HttpReadBuf<'a, 'b> {
162 buf: &'a mut ReadBuf<'b>,
163 }
164
165 impl<'a, 'b> HttpReadBuf<'a, 'b> {
append_slice(&mut self, buf: &[u8])166 pub(crate) fn append_slice(&mut self, buf: &[u8]) {
167 #[cfg(feature = "ylong_base")]
168 self.buf.append(buf);
169
170 #[cfg(feature = "tokio_base")]
171 self.buf.put_slice(buf);
172 }
173 }
174
175 impl<'a, 'b> Deref for HttpReadBuf<'a, 'b> {
176 type Target = ReadBuf<'b>;
177
deref(&self) -> &Self::Target178 fn deref(&self) -> &Self::Target {
179 self.buf
180 }
181 }
182
183 impl<S> TextIo<S>
184 where
185 S: Sync + Send + Unpin + 'static,
186 {
new(handle: Http3Conn<S>) -> Self187 pub(crate) fn new(handle: Http3Conn<S>) -> Self {
188 Self {
189 handle,
190 offset: 0,
191 remain: None,
192 is_closed: false,
193 }
194 }
195
match_channel_message( poll_result: Poll<RespMessage>, text_io: &mut TextIo<S>, buf: &mut HttpReadBuf, ) -> Option<Poll<std::io::Result<()>>>196 fn match_channel_message(
197 poll_result: Poll<RespMessage>,
198 text_io: &mut TextIo<S>,
199 buf: &mut HttpReadBuf,
200 ) -> Option<Poll<std::io::Result<()>>> {
201 match poll_result {
202 Poll::Ready(RespMessage::Output(frame)) => match frame.payload() {
203 Payload::Headers(_) => {
204 text_io.remain = Some(frame);
205 text_io.offset = 0;
206 Some(Poll::Ready(Ok(())))
207 }
208 Payload::Data(data) => {
209 let data = data.data();
210 let unfilled_len = buf.remaining();
211 let data_len = data.len();
212 let fill_len = min(data_len, unfilled_len);
213 if unfilled_len < data_len {
214 buf.append_slice(&data[..fill_len]);
215 text_io.offset += fill_len;
216 text_io.remain = Some(frame);
217 Some(Poll::Ready(Ok(())))
218 } else {
219 buf.append_slice(&data[..fill_len]);
220 Self::end_read(text_io, data_len)
221 }
222 }
223 _ => Some(Poll::Ready(Err(std::io::Error::new(
224 std::io::ErrorKind::Other,
225 HttpError::from(H3Error::Connection(H3ErrorCode::H3InternalError)),
226 )))),
227 },
228 Poll::Ready(RespMessage::OutputExit(e)) => match e {
229 DispatchErrorKind::H3(H3Error::Connection(H3ErrorCode::H3NoError))
230 | DispatchErrorKind::StreamFinished => {
231 text_io.is_closed = true;
232 Some(Poll::Ready(Ok(())))
233 }
234 _ => Some(Poll::Ready(Err(std::io::Error::new(
235 std::io::ErrorKind::Other,
236 HttpError::from(H3Error::Connection(H3ErrorCode::H3InternalError)),
237 )))),
238 },
239 Poll::Pending => Some(Poll::Pending),
240 }
241 }
242
end_read(text_io: &mut TextIo<S>, data_len: usize) -> Option<Poll<std::io::Result<()>>>243 fn end_read(text_io: &mut TextIo<S>, data_len: usize) -> Option<Poll<std::io::Result<()>>> {
244 text_io.offset = 0;
245 text_io.remain = None;
246 if data_len == 0 {
247 // no data read and is not end stream.
248 None
249 } else {
250 Some(Poll::Ready(Ok(())))
251 }
252 }
253
read_remaining_data( text_io: &mut TextIo<S>, buf: &mut HttpReadBuf, ) -> Option<Poll<std::io::Result<()>>>254 fn read_remaining_data(
255 text_io: &mut TextIo<S>,
256 buf: &mut HttpReadBuf,
257 ) -> Option<Poll<std::io::Result<()>>> {
258 if let Some(frame) = &text_io.remain {
259 return match frame.payload() {
260 Payload::Headers(_) => Some(Poll::Ready(Ok(()))),
261 Payload::Data(data) => {
262 let data = data.data();
263 let unfilled_len = buf.remaining();
264 let data_len = data.len() - text_io.offset;
265 let fill_len = min(unfilled_len, data_len);
266 // The peripheral function already ensures that the remaing of buf will not be
267 // 0.
268 if unfilled_len < data_len {
269 buf.append_slice(&data[text_io.offset..text_io.offset + fill_len]);
270 text_io.offset += fill_len;
271 Some(Poll::Ready(Ok(())))
272 } else {
273 buf.append_slice(&data[text_io.offset..text_io.offset + fill_len]);
274 Self::end_read(text_io, data_len)
275 }
276 }
277 _ => Some(Poll::Ready(Err(std::io::Error::new(
278 std::io::ErrorKind::Other,
279 HttpError::from(H3Error::Connection(H3ErrorCode::H3InternalError)),
280 )))),
281 };
282 }
283 None
284 }
285 }
286
287 impl<S: Sync + Send + Unpin + 'static> StreamData for TextIo<S> {
shutdown(&self)288 fn shutdown(&self) {
289 self.handle.io_shutdown.store(true, Ordering::Relaxed);
290 }
291
is_stream_closable(&self) -> bool292 fn is_stream_closable(&self) -> bool {
293 self.is_closed
294 }
295
http_version(&self) -> HttpVersion296 fn http_version(&self) -> HttpVersion {
297 HttpVersion::Http3
298 }
299 }
300
301 impl<S: Sync + Send + Unpin + 'static> AsyncRead for TextIo<S> {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>302 fn poll_read(
303 self: Pin<&mut Self>,
304 cx: &mut Context<'_>,
305 buf: &mut ReadBuf<'_>,
306 ) -> Poll<std::io::Result<()>> {
307 let mut buf = HttpReadBuf { buf };
308 let text_io = self.get_mut();
309 if buf.remaining() == 0 || text_io.is_closed {
310 return Poll::Ready(Ok(()));
311 }
312 if text_io
313 .handle
314 .speed_controller
315 .poll_recv_pending_timeout(cx)
316 {
317 return Poll::Ready(Err(std::io::Error::new(
318 std::io::ErrorKind::TimedOut,
319 HttpClientError::from_str(BodyTransfer, "Below low speed limit"),
320 )));
321 }
322 text_io.handle.speed_controller.init_min_recv_if_not_start();
323 if text_io
324 .handle
325 .speed_controller
326 .poll_max_recv_delay_time(cx)
327 .is_pending()
328 {
329 return Poll::Pending;
330 }
331 text_io.handle.speed_controller.init_max_recv_if_not_start();
332 while buf.remaining() != 0 {
333 if let Some(result) = Self::read_remaining_data(text_io, &mut buf) {
334 return match result {
335 Poll::Ready(Ok(_)) => {
336 let filled: usize = buf.filled().len();
337 text_io
338 .handle
339 .speed_controller
340 .min_recv_speed_limit(filled)
341 .map_err(|_e| std::io::Error::from(std::io::ErrorKind::Other))?;
342 text_io
343 .handle
344 .speed_controller
345 .delay_max_recv_speed_limit(filled);
346 text_io.handle.speed_controller.reset_recv_pending_timeout();
347 Poll::Ready(Ok(()))
348 }
349 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
350 Poll::Pending => Poll::Pending,
351 };
352 }
353
354 let poll_result = text_io
355 .handle
356 .resp_receiver
357 .poll_recv(cx)
358 .map_err(|_e| std::io::Error::from(std::io::ErrorKind::ConnectionAborted))?;
359
360 if let Some(result) = Self::match_channel_message(poll_result, text_io, &mut buf) {
361 return match result {
362 Poll::Ready(Ok(_)) => {
363 let filled: usize = buf.filled().len();
364 text_io
365 .handle
366 .speed_controller
367 .min_recv_speed_limit(filled)
368 .map_err(|_e| std::io::Error::from(std::io::ErrorKind::Other))?;
369 text_io
370 .handle
371 .speed_controller
372 .delay_max_recv_speed_limit(filled);
373 text_io.handle.speed_controller.reset_recv_pending_timeout();
374 Poll::Ready(Ok(()))
375 }
376 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
377 Poll::Pending => Poll::Pending,
378 };
379 }
380 }
381 Poll::Ready(Ok(()))
382 }
383 }
384