1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::mem::take;
15 use std::pin::Pin;
16 use std::sync::Arc;
17 use std::task::{Context, Poll};
18 use std::time::Instant;
19
20 use ylong_http::body::async_impl::Body;
21 use ylong_http::body::{ChunkBody, TextBody};
22 use ylong_http::h1::{RequestEncoder, ResponseDecoder};
23 use ylong_http::request::uri::Scheme;
24 use ylong_http::response::ResponsePart;
25 use ylong_http::version::Version;
26
27 use super::StreamData;
28 use crate::async_impl::request::Message;
29 use crate::async_impl::{HttpBody, Request, Response};
30 use crate::error::HttpClientError;
31 use crate::runtime::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
32 use crate::util::dispatcher::http1::Http1Conn;
33 use crate::util::information::ConnInfo;
34 use crate::util::interceptor::Interceptors;
35 use crate::util::normalizer::BodyLengthParser;
36
37 const TEMP_BUF_SIZE: usize = 16 * 1024;
38
request<S>( mut conn: Http1Conn<S>, mut message: Message, ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,39 pub(crate) async fn request<S>(
40 mut conn: Http1Conn<S>,
41 mut message: Message,
42 ) -> Result<Response, HttpClientError>
43 where
44 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
45 {
46 message
47 .interceptor
48 .intercept_request(message.request.ref_mut())?;
49 let mut buf = vec![0u8; TEMP_BUF_SIZE];
50
51 message
52 .request
53 .ref_mut()
54 .time_group_mut()
55 .set_transfer_start(Instant::now());
56 encode_request_part(
57 message.request.ref_mut(),
58 &message.interceptor,
59 &mut conn,
60 &mut buf,
61 )
62 .await?;
63 encode_various_body(message.request.ref_mut(), &mut conn, &mut buf).await?;
64 // Decodes response part.
65 let (part, pre) = {
66 let mut decoder = ResponseDecoder::new();
67 loop {
68 let size = match conn.raw_mut().read(buf.as_mut_slice()).await {
69 Ok(0) => {
70 conn.shutdown();
71 return err_from_msg!(Request, "Tcp closed");
72 }
73 Ok(size) => {
74 if message
75 .request
76 .ref_mut()
77 .time_group_mut()
78 .transfer_end_time()
79 .is_none()
80 {
81 message
82 .request
83 .ref_mut()
84 .time_group_mut()
85 .set_transfer_end(Instant::now())
86 }
87 size
88 }
89 Err(e) => {
90 conn.shutdown();
91 return err_from_io!(Request, e);
92 }
93 };
94
95 message.interceptor.intercept_output(&buf[..size])?;
96 match decoder.decode(&buf[..size]) {
97 Ok(None) => {}
98 Ok(Some((part, rem))) => break (part, rem),
99 Err(e) => {
100 conn.shutdown();
101 return err_from_other!(Request, e);
102 }
103 }
104 }
105 };
106
107 decode_response(message, part, conn, pre)
108 }
109
encode_various_body<S>( request: &mut Request, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,110 async fn encode_various_body<S>(
111 request: &mut Request,
112 conn: &mut Http1Conn<S>,
113 buf: &mut [u8],
114 ) -> Result<(), HttpClientError>
115 where
116 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
117 {
118 let content_length = request
119 .part()
120 .headers
121 .get("Content-Length")
122 .and_then(|v| v.to_string().ok())
123 .and_then(|v| v.parse::<u64>().ok())
124 .is_some();
125
126 let transfer_encoding = request
127 .part()
128 .headers
129 .get("Transfer-Encoding")
130 .and_then(|v| v.to_string().ok())
131 .map(|v| v.contains("chunked"))
132 .unwrap_or(false);
133
134 let body = request.body_mut();
135
136 match (content_length, transfer_encoding) {
137 (_, true) => {
138 let body = ChunkBody::from_async_reader(body);
139 encode_body(conn, body, buf).await?;
140 }
141 (true, false) => {
142 let body = TextBody::from_async_reader(body);
143 encode_body(conn, body, buf).await?;
144 }
145 (false, false) => {
146 let body = TextBody::from_async_reader(body);
147 encode_body(conn, body, buf).await?;
148 }
149 };
150 Ok(())
151 }
152
encode_request_part<S>( request: &Request, interceptor: &Arc<Interceptors>, conn: &mut Http1Conn<S>, buf: &mut [u8], ) -> Result<(), HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,153 async fn encode_request_part<S>(
154 request: &Request,
155 interceptor: &Arc<Interceptors>,
156 conn: &mut Http1Conn<S>,
157 buf: &mut [u8],
158 ) -> Result<(), HttpClientError>
159 where
160 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
161 {
162 // Encodes and sends Request-line and Headers(non-body fields).
163 let mut part_encoder = RequestEncoder::new(request.part().clone());
164 if conn.raw_mut().is_proxy() && request.uri().scheme() == Some(&Scheme::HTTP) {
165 part_encoder.absolute_uri(true);
166 }
167 loop {
168 match part_encoder.encode(&mut buf[..]) {
169 Ok(0) => break,
170 Ok(written) => {
171 interceptor.intercept_input(&buf[..written])?;
172 // RequestEncoder writes `buf` as much as possible.
173 if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
174 conn.shutdown();
175 return err_from_io!(Request, e);
176 }
177 }
178 Err(e) => {
179 conn.shutdown();
180 return err_from_other!(Request, e);
181 }
182 }
183 }
184 Ok(())
185 }
186
decode_response<S>( mut message: Message, part: ResponsePart, conn: Http1Conn<S>, pre: &[u8], ) -> Result<Response, HttpClientError> where S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,187 fn decode_response<S>(
188 mut message: Message,
189 part: ResponsePart,
190 conn: Http1Conn<S>,
191 pre: &[u8],
192 ) -> Result<Response, HttpClientError>
193 where
194 S: AsyncRead + AsyncWrite + ConnInfo + Sync + Send + Unpin + 'static,
195 {
196 // The shutdown function only sets the current connection to the closed state
197 // and does not release the connection immediately.
198 // Instead, the connection will be completely closed
199 // when the body has finished reading or when the body is released.
200 match part.headers.get("Connection") {
201 None => {
202 if part.version == Version::HTTP1_0 {
203 conn.shutdown()
204 }
205 }
206 Some(value) => {
207 if part.version == Version::HTTP1_0 {
208 if value
209 .to_string()
210 .ok()
211 .and_then(|v| v.find("keep-alive"))
212 .is_none()
213 {
214 conn.shutdown()
215 }
216 } else if value
217 .to_string()
218 .ok()
219 .and_then(|v| v.find("close"))
220 .is_some()
221 {
222 conn.shutdown()
223 }
224 }
225 }
226
227 let length = match BodyLengthParser::new(message.request.ref_mut().method(), &part).parse() {
228 Ok(length) => length,
229 Err(e) => {
230 conn.shutdown();
231 return Err(e);
232 }
233 };
234
235 let time_group = take(message.request.ref_mut().time_group_mut());
236 let body = HttpBody::new(message.interceptor, length, Box::new(conn), pre)?;
237 let mut response = Response::new(ylong_http::response::Response::from_raw_parts(part, body));
238 response.set_time_group(time_group);
239 Ok(response)
240 }
241
encode_body<S, T>( conn: &mut Http1Conn<S>, mut body: T, buf: &mut [u8], ) -> Result<(), HttpClientError> where T: Body, S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,242 async fn encode_body<S, T>(
243 conn: &mut Http1Conn<S>,
244 mut body: T,
245 buf: &mut [u8],
246 ) -> Result<(), HttpClientError>
247 where
248 T: Body,
249 S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
250 {
251 // Encodes Request Body.
252 let mut written = 0;
253 let mut end_body = false;
254 while !end_body {
255 if written < buf.len() {
256 let result = body.data(&mut buf[written..]).await;
257 let (read, end) = read_body_result::<S, T>(conn, result)?;
258 written += read;
259 end_body = end;
260 }
261 if written == buf.len() || end_body {
262 if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
263 conn.shutdown();
264 return err_from_io!(BodyTransfer, e);
265 }
266 written = 0;
267 }
268 }
269 Ok(())
270 }
271
read_body_result<S, T>( conn: &mut Http1Conn<S>, result: Result<usize, T::Error>, ) -> Result<(usize, bool), HttpClientError> where T: Body,272 fn read_body_result<S, T>(
273 conn: &mut Http1Conn<S>,
274 result: Result<usize, T::Error>,
275 ) -> Result<(usize, bool), HttpClientError>
276 where
277 T: Body,
278 {
279 let mut curr = 0;
280 let mut end_body = false;
281 match result {
282 Ok(0) => end_body = true,
283 Ok(size) => curr = size,
284 Err(e) => {
285 conn.shutdown();
286
287 let error = e.into();
288 // When using `Uploader`, here we can get `UserAborted` error.
289 return if error.source().is_some() {
290 Err(HttpClientError::user_aborted())
291 } else {
292 err_from_other!(BodyTransfer, error)
293 };
294 }
295 }
296 Ok((curr, end_body))
297 }
298
299 impl<S: AsyncRead + Unpin> AsyncRead for Http1Conn<S> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>300 fn poll_read(
301 mut self: Pin<&mut Self>,
302 cx: &mut Context<'_>,
303 buf: &mut ReadBuf<'_>,
304 ) -> Poll<std::io::Result<()>> {
305 Pin::new(self.raw_mut()).poll_read(cx, buf)
306 }
307 }
308
309 impl<S: AsyncRead + Unpin> StreamData for Http1Conn<S> {
shutdown(&self)310 fn shutdown(&self) {
311 Self::shutdown(self)
312 }
313
314 // HTTP1 can close the "stream" after reading the data
is_stream_closable(&self) -> bool315 fn is_stream_closable(&self) -> bool {
316 true
317 }
318 }
319