• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::mem::take;
15 use std::pin::Pin;
16 use std::sync::Arc;
17 use std::task::{Context, Poll};
18 use std::time::Instant;
19 
20 use ylong_http::body::async_impl::Body;
21 use ylong_http::body::{ChunkBody, TextBody};
22 use ylong_http::h1::{RequestEncoder, ResponseDecoder};
23 use ylong_http::request::uri::Scheme;
24 use ylong_http::response::ResponsePart;
25 use ylong_http::version::Version;
26 
27 use super::StreamData;
28 use crate::async_impl::request::Message;
29 use crate::async_impl::{HttpBody, Request, Response};
30 use crate::error::HttpClientError;
31 use crate::runtime::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
32 use crate::util::dispatcher::http1::Http1Conn;
33 use crate::util::information::ConnInfo;
34 use crate::util::interceptor::Interceptors;
35 use crate::util::normalizer::BodyLengthParser;
36 
37 const TEMP_BUF_SIZE: usize = 16 * 1024;
38 
request<S>( mut conn: Http1Conn<S>, mut message: Message, ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,39 pub(crate) async fn request<S>(
40     mut conn: Http1Conn<S>,
41     mut message: Message,
42 ) -> Result<Response, HttpClientError>
43 where
44     S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
45 {
46     message
47         .interceptor
48         .intercept_request(message.request.ref_mut())?;
49     let mut buf = vec![0u8; TEMP_BUF_SIZE];
50 
51     message
52         .request
53         .ref_mut()
54         .time_group_mut()
55         .set_transfer_start(Instant::now());
56     encode_request_part(
57         message.request.ref_mut(),
58         &message.interceptor,
59         &mut conn,
60         &mut buf,
61     )
62     .await?;
63     encode_various_body(message.request.ref_mut(), &mut conn, &mut buf).await?;
64     // Decodes response part.
65     let (part, pre) = {
66         let mut decoder = ResponseDecoder::new();
67         loop {
68             let size = match conn.raw_mut().read(buf.as_mut_slice()).await {
69                 Ok(0) => {
70                     conn.shutdown();
71                     return err_from_msg!(Request, "Tcp closed");
72                 }
73                 Ok(size) => {
74                     if message
75                         .request
76                         .ref_mut()
77                         .time_group_mut()
78                         .transfer_end_time()
79                         .is_none()
80                     {
81                         message
82                             .request
83                             .ref_mut()
84                             .time_group_mut()
85                             .set_transfer_end(Instant::now())
86                     }
87                     size
88                 }
89                 Err(e) => {
90                     conn.shutdown();
91                     return err_from_io!(Request, e);
92                 }
93             };
94 
95             message.interceptor.intercept_output(&buf[..size])?;
96             match decoder.decode(&buf[..size]) {
97                 Ok(None) => {}
98                 Ok(Some((part, rem))) => break (part, rem),
99                 Err(e) => {
100                     conn.shutdown();
101                     return err_from_other!(Request, e);
102                 }
103             }
104         }
105     };
106 
107     decode_response(message, part, conn, pre)
108 }
109 
encode_various_body<S>( request: &mut Request, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,110 async fn encode_various_body<S>(
111     request: &mut Request,
112     conn: &mut Http1Conn<S>,
113     buf: &mut [u8],
114 ) -> Result<(), HttpClientError>
115 where
116     S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
117 {
118     let content_length = request
119         .part()
120         .headers
121         .get("Content-Length")
122         .and_then(|v| v.to_string().ok())
123         .and_then(|v| v.parse::<u64>().ok())
124         .is_some();
125 
126     let transfer_encoding = request
127         .part()
128         .headers
129         .get("Transfer-Encoding")
130         .and_then(|v| v.to_string().ok())
131         .map(|v| v.contains("chunked"))
132         .unwrap_or(false);
133 
134     let body = request.body_mut();
135 
136     match (content_length, transfer_encoding) {
137         (_, true) => {
138             let body = ChunkBody::from_async_reader(body);
139             encode_body(conn, body, buf).await?;
140         }
141         (true, false) => {
142             let body = TextBody::from_async_reader(body);
143             encode_body(conn, body, buf).await?;
144         }
145         (false, false) => {
146             let body = TextBody::from_async_reader(body);
147             encode_body(conn, body, buf).await?;
148         }
149     };
150     Ok(())
151 }
152 
encode_request_part<S>( request: &Request, interceptor: &Arc<Interceptors>, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,153 async fn encode_request_part<S>(
154     request: &Request,
155     interceptor: &Arc<Interceptors>,
156     conn: &mut Http1Conn<S>,
157     buf: &mut [u8],
158 ) -> Result<(), HttpClientError>
159 where
160     S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
161 {
162     // Encodes and sends Request-line and Headers(non-body fields).
163     let mut part_encoder = RequestEncoder::new(request.part().clone());
164     if conn.raw_mut().is_proxy() && request.uri().scheme() == Some(&Scheme::HTTP) {
165         part_encoder.absolute_uri(true);
166     }
167     loop {
168         match part_encoder.encode(&mut buf[..]) {
169             Ok(0) => break,
170             Ok(written) => {
171                 interceptor.intercept_input(&buf[..written])?;
172                 // RequestEncoder writes `buf` as much as possible.
173                 if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
174                     conn.shutdown();
175                     return err_from_io!(Request, e);
176                 }
177             }
178             Err(e) => {
179                 conn.shutdown();
180                 return err_from_other!(Request, e);
181             }
182         }
183     }
184     Ok(())
185 }
186 
decode_response<S>( mut message: Message, part: ResponsePart, conn: Http1Conn<S>, pre: &[u8], ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,187 fn decode_response<S>(
188     mut message: Message,
189     part: ResponsePart,
190     conn: Http1Conn<S>,
191     pre: &[u8],
192 ) -> Result<Response, HttpClientError>
193 where
194     S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
195 {
196     // The shutdown function only sets the current connection to the closed state
197     // and does not release the connection immediately.
198     // Instead, the connection will be completely closed
199     // when the body has finished reading or when the body is released.
200     match part.headers.get("Connection") {
201         None => {
202             if part.version == Version::HTTP1_0 {
203                 conn.shutdown()
204             }
205         }
206         Some(value) => {
207             if part.version == Version::HTTP1_0 {
208                 if value
209                     .to_string()
210                     .ok()
211                     .and_then(|v| v.find("keep-alive"))
212                     .is_none()
213                 {
214                     conn.shutdown()
215                 }
216             } else if value
217                 .to_string()
218                 .ok()
219                 .and_then(|v| v.find("close"))
220                 .is_some()
221             {
222                 conn.shutdown()
223             }
224         }
225     }
226 
227     let length = match BodyLengthParser::new(message.request.ref_mut().method(), &part).parse() {
228         Ok(length) => length,
229         Err(e) => {
230             conn.shutdown();
231             return Err(e);
232         }
233     };
234 
235     let time_group = take(message.request.ref_mut().time_group_mut());
236     let body = HttpBody::new(message.interceptor, length, Box::new(conn), pre)?;
237     let mut response = Response::new(ylong_http::response::Response::from_raw_parts(part, body));
238     response.set_time_group(time_group);
239     Ok(response)
240 }
241 
encode_body<S, T>( conn: &mut Http1Conn<S>, mut body: T, buf: &mut [u8], ) -> Result<(), HttpClientError> where T: Body, S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,242 async fn encode_body<S, T>(
243     conn: &mut Http1Conn<S>,
244     mut body: T,
245     buf: &mut [u8],
246 ) -> Result<(), HttpClientError>
247 where
248     T: Body,
249     S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
250 {
251     // Encodes Request Body.
252     let mut written = 0;
253     let mut end_body = false;
254     while !end_body {
255         if written < buf.len() {
256             let result = body.data(&mut buf[written..]).await;
257             let (read, end) = read_body_result::<S, T>(conn, result)?;
258             written += read;
259             end_body = end;
260         }
261         if written == buf.len() || end_body {
262             if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
263                 conn.shutdown();
264                 return err_from_io!(BodyTransfer, e);
265             }
266             written = 0;
267         }
268     }
269     Ok(())
270 }
271 
read_body_result<S, T>( conn: &mut Http1Conn<S>, result: Result<usize, T::Error>, ) -> Result<(usize, bool), HttpClientError> where T: Body,272 fn read_body_result<S, T>(
273     conn: &mut Http1Conn<S>,
274     result: Result<usize, T::Error>,
275 ) -> Result<(usize, bool), HttpClientError>
276 where
277     T: Body,
278 {
279     let mut curr = 0;
280     let mut end_body = false;
281     match result {
282         Ok(0) => end_body = true,
283         Ok(size) => curr = size,
284         Err(e) => {
285             conn.shutdown();
286 
287             let error = e.into();
288             // When using `Uploader`, here we can get `UserAborted` error.
289             return if error.source().is_some() {
290                 Err(HttpClientError::user_aborted())
291             } else {
292                 err_from_other!(BodyTransfer, error)
293             };
294         }
295     }
296     Ok((curr, end_body))
297 }
298 
299 impl<S: AsyncRead + Unpin> AsyncRead for Http1Conn<S> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>300     fn poll_read(
301         mut self: Pin<&mut Self>,
302         cx: &mut Context<'_>,
303         buf: &mut ReadBuf<'_>,
304     ) -> Poll<std::io::Result<()>> {
305         Pin::new(self.raw_mut()).poll_read(cx, buf)
306     }
307 }
308 
309 impl<S: AsyncRead + Unpin> StreamData for Http1Conn<S> {
shutdown(&self)310     fn shutdown(&self) {
311         Self::shutdown(self)
312     }
313 
314     // HTTP1 can close the "stream" after reading the data
is_stream_closable(&self) -> bool315     fn is_stream_closable(&self) -> bool {
316         true
317     }
318 }
319