• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::mem::take;
15 use std::pin::Pin;
16 use std::sync::Arc;
17 use std::task::{Context, Poll};
18 use std::time::Instant;
19 
20 use ylong_http::body::async_impl::Body;
21 use ylong_http::body::{ChunkBody, TextBody};
22 use ylong_http::h1::{RequestEncoder, ResponseDecoder};
23 use ylong_http::request::uri::Scheme;
24 use ylong_http::response::ResponsePart;
25 use ylong_http::version::Version;
26 
27 use super::StreamData;
28 use crate::async_impl::request::Message;
29 use crate::async_impl::{HttpBody, Request, Response};
30 use crate::error::HttpClientError;
31 use crate::runtime::{poll_fn, AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf};
32 use crate::util::config::HttpVersion;
33 use crate::util::dispatcher::http1::Http1Conn;
34 use crate::util::information::ConnInfo;
35 use crate::util::interceptor::Interceptors;
36 use crate::util::normalizer::BodyLengthParser;
37 use crate::ErrorKind::BodyTransfer;
38 
39 const TEMP_BUF_SIZE: usize = 16 * 1024;
40 
request<S>( mut conn: Http1Conn<S>, mut message: Message, ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,41 pub(crate) async fn request<S>(
42     mut conn: Http1Conn<S>,
43     mut message: Message,
44 ) -> Result<Response, HttpClientError>
45 where
46     S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
47 {
48     message
49         .interceptor
50         .intercept_request(message.request.ref_mut())?;
51     let mut buf = vec![0u8; TEMP_BUF_SIZE];
52 
53     message
54         .request
55         .ref_mut()
56         .time_group_mut()
57         .set_transfer_start(Instant::now());
58     let mut guard = conn.cancel_guard();
59     encode_request_part(
60         message.request.ref_mut(),
61         &message.interceptor,
62         &mut conn,
63         &mut buf,
64     )
65     .await?;
66     encode_various_body(message.request.ref_mut(), &mut conn, &mut buf).await?;
67     // Decodes response part.
68     let (part, pre) = {
69         let mut decoder = ResponseDecoder::new();
70         loop {
71             let size = poll_fn(|cx| {
72                 if conn.speed_controller.poll_recv_pending_timeout(cx) {
73                     return Poll::Ready(Err(HttpClientError::from_str(
74                         BodyTransfer,
75                         "Below low speed limit",
76                     )));
77                 }
78                 let result =
79                     read_status_line(cx, &mut conn, message.request.ref_mut(), buf.as_mut_slice())?;
80                 if let Poll::Ready(filled) = result {
81                     conn.speed_controller.reset_recv_pending_timeout();
82                     return Poll::Ready(Ok(filled));
83                 }
84                 Poll::Pending
85             })
86             .await?;
87 
88             message.interceptor.intercept_output(&buf[..size])?;
89             match decoder.decode(&buf[..size]) {
90                 Ok(None) => {}
91                 Ok(Some((part, rem))) => break (part, rem),
92                 Err(e) => {
93                     conn.shutdown();
94                     return err_from_other!(Request, e);
95                 }
96             }
97         }
98     };
99     guard.normal_end();
100     // if task cancel occurs, we should shutdown io
101     drop(guard);
102 
103     decode_response(message, part, conn, pre)
104 }
105 
read_status_line<S>( cx: &mut Context<'_>, conn: &mut Http1Conn<S>, request: &mut Request, buf: &mut [u8], ) -> Poll<Result<usize, HttpClientError>> where S: AsyncRead + Sync + Send + Unpin + 'static,106 fn read_status_line<S>(
107     cx: &mut Context<'_>,
108     conn: &mut Http1Conn<S>,
109     request: &mut Request,
110     buf: &mut [u8],
111 ) -> Poll<Result<usize, HttpClientError>>
112 where
113     S: AsyncRead + Sync + Send + Unpin + 'static,
114 {
115     let mut read_buf = ReadBuf::new(buf);
116     match Pin::new(conn.raw_mut()).poll_read(cx, &mut read_buf) {
117         Poll::Ready(Ok(_)) => {
118             #[cfg(feature = "ylong_base")]
119             let size = read_buf.filled_len();
120 
121             #[cfg(feature = "tokio_base")]
122             let size = read_buf.filled().len();
123 
124             if size == 0 {
125                 conn.shutdown();
126                 return Poll::Ready(err_from_msg!(Request, "Tcp closed"));
127             }
128             if request.time_group_mut().transfer_end_time().is_none() {
129                 request.time_group_mut().set_transfer_end(Instant::now())
130             }
131             Poll::Ready(Ok(size))
132         }
133         Poll::Ready(Err(e)) => {
134             conn.shutdown();
135             Poll::Ready(err_from_io!(Request, e))
136         }
137         Poll::Pending => Poll::Pending,
138     }
139 }
140 
encode_various_body<S>( request: &mut Request, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,141 async fn encode_various_body<S>(
142     request: &mut Request,
143     conn: &mut Http1Conn<S>,
144     buf: &mut [u8],
145 ) -> Result<(), HttpClientError>
146 where
147     S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
148 {
149     let content_length = request
150         .part()
151         .headers
152         .get("Content-Length")
153         .and_then(|v| v.to_string().ok())
154         .and_then(|v| v.parse::<u64>().ok())
155         .is_some();
156 
157     let transfer_encoding = request
158         .part()
159         .headers
160         .get("Transfer-Encoding")
161         .and_then(|v| v.to_string().ok())
162         .map(|v| v.contains("chunked"))
163         .unwrap_or(false);
164 
165     let body = request.body_mut();
166 
167     match (content_length, transfer_encoding) {
168         (_, true) => {
169             let body = ChunkBody::from_async_reader(body);
170             encode_body(conn, body, buf).await?;
171         }
172         (true, false) => {
173             let body = TextBody::from_async_reader(body);
174             encode_body(conn, body, buf).await?;
175         }
176         (false, false) => {
177             let body = TextBody::from_async_reader(body);
178             encode_body(conn, body, buf).await?;
179         }
180     };
181     Ok(())
182 }
183 
encode_request_part<S>( request: &Request, interceptor: &Arc<Interceptors>, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,184 async fn encode_request_part<S>(
185     request: &Request,
186     interceptor: &Arc<Interceptors>,
187     conn: &mut Http1Conn<S>,
188     buf: &mut [u8],
189 ) -> Result<(), HttpClientError>
190 where
191     S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
192 {
193     // Encodes and sends Request-line and Headers(non-body fields).
194     let mut part_encoder = RequestEncoder::new(request.part().clone());
195     if conn.raw_mut().is_proxy() && request.uri().scheme() == Some(&Scheme::HTTP) {
196         part_encoder.absolute_uri(true);
197     }
198     loop {
199         match part_encoder.encode(&mut buf[..]) {
200             Ok(0) => break,
201             Ok(written) => {
202                 interceptor.intercept_input(&buf[..written])?;
203                 // RequestEncoder writes `buf` as much as possible.
204                 if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
205                     conn.shutdown();
206                     return err_from_io!(Request, e);
207                 }
208             }
209             Err(e) => {
210                 conn.shutdown();
211                 return err_from_other!(Request, e);
212             }
213         }
214     }
215     Ok(())
216 }
217 
decode_response<S>( mut message: Message, part: ResponsePart, conn: Http1Conn<S>, pre: &[u8], ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,218 fn decode_response<S>(
219     mut message: Message,
220     part: ResponsePart,
221     conn: Http1Conn<S>,
222     pre: &[u8],
223 ) -> Result<Response, HttpClientError>
224 where
225     S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
226 {
227     // The shutdown function only sets the current connection to the closed state
228     // and does not release the connection immediately.
229     // Instead, the connection will be completely closed
230     // when the body has finished reading or when the body is released.
231     match part.headers.get("Connection") {
232         None => {
233             if part.version == Version::HTTP1_0 {
234                 conn.shutdown()
235             }
236         }
237         Some(value) => {
238             if part.version == Version::HTTP1_0 {
239                 if value
240                     .to_string()
241                     .ok()
242                     .and_then(|v| v.find("keep-alive"))
243                     .is_none()
244                 {
245                     conn.shutdown()
246                 }
247             } else if value
248                 .to_string()
249                 .ok()
250                 .and_then(|v| v.find("close"))
251                 .is_some()
252             {
253                 conn.shutdown()
254             }
255         }
256     }
257 
258     let length = match BodyLengthParser::new(message.request.ref_mut().method(), &part).parse() {
259         Ok(length) => length,
260         Err(e) => {
261             conn.shutdown();
262             return Err(e);
263         }
264     };
265 
266     let time_group = take(message.request.ref_mut().time_group_mut());
267     let body = HttpBody::new(message.interceptor, length, Box::new(conn), pre)?;
268     let mut response = Response::new(ylong_http::response::Response::from_raw_parts(part, body));
269     response.set_time_group(time_group);
270     Ok(response)
271 }
272 
encode_body<S, T>( conn: &mut Http1Conn<S>, mut body: T, buf: &mut [u8], ) -> Result<(), HttpClientError> where T: Body, S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,273 async fn encode_body<S, T>(
274     conn: &mut Http1Conn<S>,
275     mut body: T,
276     buf: &mut [u8],
277 ) -> Result<(), HttpClientError>
278 where
279     T: Body,
280     S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
281 {
282     // Encodes Request Body.
283     let mut written = 0;
284     let mut end_body = false;
285     while !end_body {
286         if written < buf.len() {
287             let result = body.data(&mut buf[written..]).await;
288             let (read, end) = read_body_result::<S, T>(conn, result)?;
289             written += read;
290             end_body = end;
291         }
292         if written == buf.len() || end_body {
293             conn.speed_controller.init_min_send_if_not_start();
294             conn.speed_controller.init_max_send_if_not_start();
295             let mut write_size = 0;
296             loop {
297                 let write_res = poll_fn(|cx| {
298                     if conn.speed_controller.poll_send_pending_timeout(cx) {
299                         return Poll::Ready(Err(HttpClientError::from_str(
300                             BodyTransfer,
301                             "Below low speed limit",
302                         )));
303                     }
304                     let write_poll =
305                         Pin::new(conn.raw_mut()).poll_write(cx, &buf[write_size..written]);
306                     if let Poll::Ready(Ok(_)) = write_poll {
307                         conn.speed_controller.reset_send_pending_timeout();
308                     }
309                     write_poll.map_err(|e| HttpClientError::from_error(BodyTransfer, e))
310                 })
311                 .await;
312                 match write_res {
313                     Ok(size) => write_size += size,
314                     Err(e) => {
315                         conn.shutdown();
316                         return Err(e);
317                     }
318                 }
319                 if write_size == written {
320                     break;
321                 }
322             }
323             if conn.speed_controller.need_limit_max_send_speed() {
324                 conn.speed_controller.max_send_speed_limit(written).await;
325             }
326             conn.speed_controller.min_send_speed_limit(written)?;
327             written = 0;
328         }
329     }
330     Ok(())
331 }
332 
read_body_result<S, T>( conn: &mut Http1Conn<S>, result: Result<usize, T::Error>, ) -> Result<(usize, bool), HttpClientError> where T: Body,333 fn read_body_result<S, T>(
334     conn: &mut Http1Conn<S>,
335     result: Result<usize, T::Error>,
336 ) -> Result<(usize, bool), HttpClientError>
337 where
338     T: Body,
339 {
340     let mut curr = 0;
341     let mut end_body = false;
342     match result {
343         Ok(0) => end_body = true,
344         Ok(size) => curr = size,
345         Err(e) => {
346             conn.shutdown();
347 
348             let error = e.into();
349             // When using `Uploader`, here we can get `UserAborted` error.
350             return if error.source().is_some() {
351                 Err(HttpClientError::user_aborted())
352             } else {
353                 err_from_other!(BodyTransfer, error)
354             };
355         }
356     }
357     Ok((curr, end_body))
358 }
359 
360 impl<S: AsyncRead + Unpin> AsyncRead for Http1Conn<S> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>361     fn poll_read(
362         mut self: Pin<&mut Self>,
363         cx: &mut Context<'_>,
364         buf: &mut ReadBuf<'_>,
365     ) -> Poll<std::io::Result<()>> {
366         if self.speed_controller.poll_recv_pending_timeout(cx) {
367             return Poll::Ready(Err(std::io::Error::new(
368                 std::io::ErrorKind::TimedOut,
369                 HttpClientError::from_str(BodyTransfer, "Below low speed limit"),
370             )));
371         }
372         self.speed_controller.init_min_recv_if_not_start();
373         if self
374             .speed_controller
375             .poll_max_recv_delay_time(cx)
376             .is_pending()
377         {
378             return Poll::Pending;
379         }
380         self.speed_controller.init_max_recv_if_not_start();
381         match Pin::new(self.raw_mut()).poll_read(cx, buf) {
382             Poll::Ready(Ok(_)) => {
383                 let filled: usize = buf.filled().len();
384                 self.speed_controller
385                     .min_recv_speed_limit(filled)
386                     .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
387                 self.speed_controller.delay_max_recv_speed_limit(filled);
388                 self.speed_controller.reset_recv_pending_timeout();
389                 Poll::Ready(Ok(()))
390             }
391             Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
392             Poll::Pending => Poll::Pending,
393         }
394     }
395 }
396 
397 impl<S: AsyncRead + Unpin> StreamData for Http1Conn<S> {
shutdown(&self)398     fn shutdown(&self) {
399         Self::shutdown(self)
400     }
401 
402     // HTTP1 can close the "stream" after reading the data
is_stream_closable(&self) -> bool403     fn is_stream_closable(&self) -> bool {
404         true
405     }
406 
http_version(&self) -> HttpVersion407     fn http_version(&self) -> HttpVersion {
408         HttpVersion::Http1
409     }
410 }
411