• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::pin::Pin;
15 use std::task::{Context, Poll};
16 
17 use ylong_http::h1::{RequestEncoder, ResponseDecoder};
18 use ylong_http::response::Response;
19 
20 use crate::async_impl::{Body, HttpBody, StreamData};
21 use crate::error::{ErrorKind, HttpClientError};
22 use crate::util::dispatcher::http1::Http1Conn;
23 use crate::util::normalizer::BodyLengthParser;
24 use crate::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf, Request};
25 
26 const TEMP_BUF_SIZE: usize = 16 * 1024;
27 
request<S, T>( mut conn: Http1Conn<S>, request: &mut Request<T>, ) -> Result<Response<HttpBody>, HttpClientError> where T: Body, S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,28 pub(crate) async fn request<S, T>(
29     mut conn: Http1Conn<S>,
30     request: &mut Request<T>,
31 ) -> Result<Response<HttpBody>, HttpClientError>
32 where
33     T: Body,
34     S: AsyncRead + AsyncWrite + Sync + Send + Unpin + 'static,
35 {
36     let mut buf = vec![0u8; TEMP_BUF_SIZE];
37 
38     // Encodes and sends Request-line and Headers(non-body fields).
39     let mut non_body = RequestEncoder::new(request.part().clone());
40     non_body.set_proxy(true);
41     loop {
42         match non_body.encode(&mut buf[..]) {
43             Ok(0) => break,
44             Ok(written) => {
45                 // RequestEncoder writes `buf` as much as possible.
46                 if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
47                     conn.shutdown();
48                     return Err(HttpClientError::new_with_cause(ErrorKind::Request, Some(e)));
49                 }
50             }
51             Err(e) => {
52                 conn.shutdown();
53                 return Err(HttpClientError::new_with_cause(ErrorKind::Request, Some(e)));
54             }
55         }
56     }
57 
58     // Encodes Request Body.
59     let body = request.body_mut();
60     let mut written = 0;
61     let mut end_body = false;
62     while !end_body {
63         if written < buf.len() {
64             match body.data(&mut buf[written..]).await {
65                 Ok(0) => end_body = true,
66                 Ok(size) => written += size,
67                 Err(e) => {
68                     conn.shutdown();
69                     return Err(HttpClientError::new_with_cause(
70                         ErrorKind::BodyTransfer,
71                         Some(e),
72                     ));
73                 }
74             }
75         }
76         if written == buf.len() || end_body {
77             if let Err(e) = conn.raw_mut().write_all(&buf[..written]).await {
78                 conn.shutdown();
79                 return Err(HttpClientError::new_with_cause(
80                     ErrorKind::BodyTransfer,
81                     Some(e),
82                 ));
83             }
84             written = 0;
85         }
86     }
87 
88     // Decodes response part.
89     let (part, pre) = {
90         let mut decoder = ResponseDecoder::new();
91         loop {
92             let size = match conn.raw_mut().read(buf.as_mut_slice()).await {
93                 Ok(0) => {
94                     conn.shutdown();
95                     return Err(HttpClientError::new_with_message(
96                         ErrorKind::Request,
97                         "Tcp Closed",
98                     ));
99                 }
100                 Ok(size) => size,
101                 Err(e) => {
102                     conn.shutdown();
103                     return Err(HttpClientError::new_with_cause(ErrorKind::Request, Some(e)));
104                 }
105             };
106 
107             match decoder.decode(&buf[..size]) {
108                 Ok(None) => {}
109                 Ok(Some((part, rem))) => break (part, rem),
110                 Err(e) => {
111                     conn.shutdown();
112                     return Err(HttpClientError::new_with_cause(ErrorKind::Request, Some(e)));
113                 }
114             }
115         }
116     };
117 
118     let length = match BodyLengthParser::new(request.method(), &part).parse() {
119         Ok(length) => length,
120         Err(e) => {
121             conn.shutdown();
122             return Err(e);
123         }
124     };
125 
126     let body = HttpBody::new(length, Box::new(conn), pre)?;
127     Ok(Response::from_raw_parts(part, body))
128 }
129 
130 impl<S: AsyncRead + Unpin> AsyncRead for Http1Conn<S> {
poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<std::io::Result<()>>131     fn poll_read(
132         mut self: Pin<&mut Self>,
133         cx: &mut Context<'_>,
134         buf: &mut ReadBuf<'_>,
135     ) -> Poll<std::io::Result<()>> {
136         Pin::new(self.raw_mut()).poll_read(cx, buf)
137     }
138 }
139 
140 impl<S: AsyncRead + Unpin> StreamData for Http1Conn<S> {
shutdown(&self)141     fn shutdown(&self) {
142         Self::shutdown(self)
143     }
144 }
145