1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::mem::take;
15 use std::pin::Pin;
16 use std::sync::Arc;
17 use std::task::{Context, Poll};
18 use std::time::Instant;
19
20 use ylong_http::body::async_impl::Body;
21 use ylong_http::body::{ChunkBody, TextBody};
22 use ylong_http::h1::{RequestEncoder, ResponseDecoder};
23 use ylong_http::request::uri::Scheme;
24 use ylong_http::response::ResponsePart;
25 use ylong_http::version::Version;
26
27 use super::StreamData;
28 use crate::async_impl::request::Message;
29 use crate::async_impl::{HttpBody, Request, Response};
30 use crate::error::HttpClientError;
31 use crate::runtime::{poll_fn, AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
32 use crate::util::config::HttpVersion;
33 use crate::util::dispatcher::http1::Http1Conn;
34 use crate::util::information::ConnInfo;
35 use crate::util::interceptor::Interceptors;
36 use crate::util::normalizer::BodyLengthParser;
37 use crate::ErrorKind::BodyTransfer;
38
39 const TEMP_BUF_SIZE: usize = 16 * 1024;
40
request<S>( mut conn: Http1Conn<S>, mut message: Message, ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,41 pub(crate) async fn request<S>(
42 mut conn: Http1Conn<S>,
43 mut message: Message,
44 ) -> Result<Response, HttpClientError>
45 where
46 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
47 {
48 message
49 .interceptor
50 .intercept_request(message.request.ref_mut())?;
51 let mut buf = vec![0u8; TEMP_BUF_SIZE];
52
53 message
54 .request
55 .ref_mut()
56 .time_group_mut()
57 .set_transfer_start(Instant::now());
58 let mut guard = conn.cancel_guard();
59 encode_request_part(
60 message.request.ref_mut(),
61 &message.interceptor,
62 &mut conn,
63 &mut buf,
64 )
65 .await?;
66 encode_various_body(message.request.ref_mut(), &mut conn, &mut buf).await?;
67 // Decodes response part.
68 let (part, pre) = {
69 let mut decoder = ResponseDecoder::new();
70 loop {
71 let size = poll_fn(|cx| {
72 if conn.speed_controller.poll_recv_pending_timeout(cx) {
73 return Poll::Ready(Err(HttpClientError::from_str(
74 BodyTransfer,
75 "Below low speed limit",
76 )));
77 }
78 let result =
79 read_status_line(cx, &mut conn, message.request.ref_mut(), buf.as_mut_slice())?;
80 if let Poll::Ready(filled) = result {
81 conn.speed_controller.reset_recv_pending_timeout();
82 return Poll::Ready(Ok(filled));
83 }
84 Poll::Pending
85 })
86 .await?;
87
88 message.interceptor.intercept_output(&buf[..size])?;
89 match decoder.decode(&buf[..size]) {
90 Ok(None) => {}
91 Ok(Some((part, rem))) => break (part, rem),
92 Err(e) => {
93 conn.shutdown();
94 return err_from_other!(Request, e);
95 }
96 }
97 }
98 };
99 guard.normal_end();
100 // if task cancel occurs, we should shutdown io
101 drop(guard);
102
103 decode_response(message, part, conn, pre)
104 }
105
read_status_line<S>( cx: &mut Context<'_>, conn: &mut Http1Conn<S>, request: &mut Request, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>> where S: AsyncRead + Sync + Send + Unpin + 'static,106 fn read_status_line<S>(
107 cx: &mut Context<'_>,
108 conn: &mut Http1Conn<S>,
109 request: &mut Request,
110 buf: &mut [u8],
111 ) -> Poll<Result<usize, HttpClientError>>
112 where
113 S: AsyncRead + Sync + Send + Unpin + 'static,
114 {
115 let mut read_buf = ReadBuf::new(buf);
116 match Pin::new(conn.raw_mut()).poll_read(cx, &mut read_buf) {
117 Poll::Ready(Ok(_)) => {
118 #[cfg(feature = "ylong_base")]
119 let size = read_buf.filled_len();
120
121 #[cfg(feature = "tokio_base")]
122 let size = read_buf.filled().len();
123
124 if size == 0 {
125 conn.shutdown();
126 return Poll::Ready(err_from_msg!(Request, "Tcp closed"));
127 }
128 if request.time_group_mut().transfer_end_time().is_none() {
129 request.time_group_mut().set_transfer_end(Instant::now())
130 }
131 Poll::Ready(Ok(size))
132 }
133 Poll::Ready(Err(e)) => {
134 conn.shutdown();
135 Poll::Ready(err_from_io!(Request, e))
136 }
137 Poll::Pending => Poll::Pending,
138 }
139 }
140
encode_various_body<S>( request: &mut Request, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,141 async fn encode_various_body<S>(
142 request: &mut Request,
143 conn: &mut Http1Conn<S>,
144 buf: &mut [u8],
145 ) -> Result<(), HttpClientError>
146 where
147 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
148 {
149 let content_length = request
150 .part()
151 .headers
152 .get("Content-Length")
153 .and_then(|v| v.to_string().ok())
154 .and_then(|v| v.parse::<u64>().ok())
155 .is_some();
156
157 let transfer_encoding = request
158 .part()
159 .headers
160 .get("Transfer-Encoding")
161 .and_then(|v| v.to_string().ok())
162 .map(|v| v.contains("chunked"))
163 .unwrap_or(false);
164
165 let body = request.body_mut();
166
167 match (content_length, transfer_encoding) {
168 (_, true) => {
169 let body = ChunkBody::from_async_reader(body);
170 encode_body(conn, body, buf).await?;
171 }
172 (true, false) => {
173 let body = TextBody::from_async_reader(body);
174 encode_body(conn, body, buf).await?;
175 }
176 (false, false) => {
177 let body = TextBody::from_async_reader(body);
178 encode_body(conn, body, buf).await?;
179 }
180 };
181 Ok(())
182 }
183
encode_request_part<S>( request: &Request, interceptor: &Arc<Interceptors>, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,184 async fn encode_request_part<S>(
185 request: &Request,
186 interceptor: &Arc<Interceptors>,
187 conn: &mut Http1Conn<S>,
188 buf: &mut [u8],
189 ) -> Result<(), HttpClientError>
190 where
191 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
192 {
193 // Encodes and sends Request-line and Headers(non-body fields).
194 let mut part_encoder = RequestEncoder::new(request.part().clone());
195 if conn.raw_mut().is_proxy() && request.uri().scheme() == Some(&Scheme::HTTP) {
196 part_encoder.absolute_uri(true);
197 }
198 loop {
199 match part_encoder.encode(&mut buf[..]) {
200 Ok(0) => break,
201 Ok(written) => {
202 interceptor.intercept_input(&buf[..written])?;
203 // RequestEncoder writes `buf` as much as possible.
204 if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
205 conn.shutdown();
206 return err_from_io!(Request, e);
207 }
208 }
209 Err(e) => {
210 conn.shutdown();
211 return err_from_other!(Request, e);
212 }
213 }
214 }
215 Ok(())
216 }
217
decode_response<S>( mut message: Message, part: ResponsePart, conn: Http1Conn<S>, pre: &[u8], ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,218 fn decode_response<S>(
219 mut message: Message,
220 part: ResponsePart,
221 conn: Http1Conn<S>,
222 pre: &[u8],
223 ) -> Result<Response, HttpClientError>
224 where
225 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
226 {
227 // The shutdown function only sets the current connection to the closed state
228 // and does not release the connection immediately.
229 // Instead, the connection will be completely closed
230 // when the body has finished reading or when the body is released.
231 match part.headers.get("Connection") {
232 None => {
233 if part.version == Version::HTTP1_0 {
234 conn.shutdown()
235 }
236 }
237 Some(value) => {
238 if part.version == Version::HTTP1_0 {
239 if value
240 .to_string()
241 .ok()
242 .and_then(|v| v.find("keep-alive"))
243 .is_none()
244 {
245 conn.shutdown()
246 }
247 } else if value
248 .to_string()
249 .ok()
250 .and_then(|v| v.find("close"))
251 .is_some()
252 {
253 conn.shutdown()
254 }
255 }
256 }
257
258 let length = match BodyLengthParser::new(message.request.ref_mut().method(), &part).parse() {
259 Ok(length) => length,
260 Err(e) => {
261 conn.shutdown();
262 return Err(e);
263 }
264 };
265
266 let time_group = take(message.request.ref_mut().time_group_mut());
267 let body = HttpBody::new(message.interceptor, length, Box::new(conn), pre)?;
268 let mut response = Response::new(ylong_http::response::Response::from_raw_parts(part, body));
269 response.set_time_group(time_group);
270 Ok(response)
271 }
272
encode_body<S, T>( conn: &mut Http1Conn<S>, mut body: T, buf: &mut [u8], ) -> Result<(), HttpClientError> where T: Body, S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,273 async fn encode_body<S, T>(
274 conn: &mut Http1Conn<S>,
275 mut body: T,
276 buf: &mut [u8],
277 ) -> Result<(), HttpClientError>
278 where
279 T: Body,
280 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
281 {
282 // Encodes Request Body.
283 let mut written = 0;
284 let mut end_body = false;
285 while !end_body {
286 if written < buf.len() {
287 let result = body.data(&mut buf[written..]).await;
288 let (read, end) = read_body_result::<S, T>(conn, result)?;
289 written += read;
290 end_body = end;
291 }
292 if written == buf.len() || end_body {
293 conn.speed_controller.init_min_send_if_not_start();
294 conn.speed_controller.init_max_send_if_not_start();
295 let mut write_size = 0;
296 loop {
297 let write_res = poll_fn(|cx| {
298 if conn.speed_controller.poll_send_pending_timeout(cx) {
299 return Poll::Ready(Err(HttpClientError::from_str(
300 BodyTransfer,
301 "Below low speed limit",
302 )));
303 }
304 let write_poll =
305 Pin::new(conn.raw_mut()).poll_write(cx, &buf[write_size..written]);
306 if let Poll::Ready(Ok(_)) = write_poll {
307 conn.speed_controller.reset_send_pending_timeout();
308 }
309 write_poll.map_err(|e| HttpClientError::from_error(BodyTransfer, e))
310 })
311 .await;
312 match write_res {
313 Ok(size) => write_size += size,
314 Err(e) => {
315 conn.shutdown();
316 return Err(e);
317 }
318 }
319 if write_size == written {
320 break;
321 }
322 }
323 if conn.speed_controller.need_limit_max_send_speed() {
324 conn.speed_controller.max_send_speed_limit(written).await;
325 }
326 conn.speed_controller.min_send_speed_limit(written)?;
327 written = 0;
328 }
329 }
330 Ok(())
331 }
332
read_body_result<S, T>( conn: &mut Http1Conn<S>, result: Result<usize, T::Error>, ) -> Result<(usize, bool), HttpClientError> where T: Body,333 fn read_body_result<S, T>(
334 conn: &mut Http1Conn<S>,
335 result: Result<usize, T::Error>,
336 ) -> Result<(usize, bool), HttpClientError>
337 where
338 T: Body,
339 {
340 let mut curr = 0;
341 let mut end_body = false;
342 match result {
343 Ok(0) => end_body = true,
344 Ok(size) => curr = size,
345 Err(e) => {
346 conn.shutdown();
347
348 let error = e.into();
349 // When using `Uploader`, here we can get `UserAborted` error.
350 return if error.source().is_some() {
351 Err(HttpClientError::user_aborted())
352 } else {
353 err_from_other!(BodyTransfer, error)
354 };
355 }
356 }
357 Ok((curr, end_body))
358 }
359
360 impl<S: AsyncRead + Unpin> AsyncRead for Http1Conn<S> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>361 fn poll_read(
362 mut self: Pin<&mut Self>,
363 cx: &mut Context<'_>,
364 buf: &mut ReadBuf<'_>,
365 ) -> Poll<std::io::Result<()>> {
366 if self.speed_controller.poll_recv_pending_timeout(cx) {
367 return Poll::Ready(Err(std::io::Error::new(
368 std::io::ErrorKind::TimedOut,
369 HttpClientError::from_str(BodyTransfer, "Below low speed limit"),
370 )));
371 }
372 self.speed_controller.init_min_recv_if_not_start();
373 if self
374 .speed_controller
375 .poll_max_recv_delay_time(cx)
376 .is_pending()
377 {
378 return Poll::Pending;
379 }
380 self.speed_controller.init_max_recv_if_not_start();
381 match Pin::new(self.raw_mut()).poll_read(cx, buf) {
382 Poll::Ready(Ok(_)) => {
383 let filled: usize = buf.filled().len();
384 self.speed_controller
385 .min_recv_speed_limit(filled)
386 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
387 self.speed_controller.delay_max_recv_speed_limit(filled);
388 self.speed_controller.reset_recv_pending_timeout();
389 Poll::Ready(Ok(()))
390 }
391 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
392 Poll::Pending => Poll::Pending,
393 }
394 }
395 }
396
397 impl<S: AsyncRead + Unpin> StreamData for Http1Conn<S> {
shutdown(&self)398 fn shutdown(&self) {
399 Self::shutdown(self)
400 }
401
402 // HTTP1 can close the "stream" after reading the data
is_stream_closable(&self) -> bool403 fn is_stream_closable(&self) -> bool {
404 true
405 }
406
http_version(&self) -> HttpVersion407 fn http_version(&self) -> HttpVersion {
408 HttpVersion::Http1
409 }
410 }
411