• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Asynchronous `Connector` trait and `HttpConnector` implementation.
15 
16 use core::future::Future;
17 use std::error::Error;
18 use std::io;
19 use std::net::ToSocketAddrs;
20 
21 use crate::util::ConnectorConfig;
22 use crate::{AsyncRead, AsyncWrite, TcpStream, Uri};
23 
24 /// `Connector` trait used by `async_impl::Client`. `Connector` provides
25 /// asynchronous connection establishment interfaces.
26 pub trait Connector {
27     /// Streams that this connector produces.
28     type Stream: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static;
29     /// Possible errors that this connector may generate when attempting to
30     /// connect.
31     type Error: Into<Box<dyn Error + Sync + Send>>;
32     /// Futures generated by this connector when attempting to create a stream.
33     type Future: Future<Output = Result<Self::Stream, Self::Error>> + Unpin + Sync + Send + 'static;
34 
35     /// Attempts to establish a connection.
connect(&self, uri: &Uri) -> Self::Future36     fn connect(&self, uri: &Uri) -> Self::Future;
37 }
38 
39 /// Connector for creating HTTP or HTTPS connections asynchronously.
40 ///
41 /// `HttpConnector` implements `async_impl::Connector` trait.
42 #[derive(Default)]
43 pub struct HttpConnector {
44     config: ConnectorConfig,
45 }
46 
47 impl HttpConnector {
48     /// Creates a new `HttpConnector` with a `ConnectorConfig`.
new(config: ConnectorConfig) -> HttpConnector49     pub(crate) fn new(config: ConnectorConfig) -> HttpConnector {
50         HttpConnector { config }
51     }
52 }
53 
54 // TODO: Fix this function after `ylong_runtime::TcpStream` support set_nodelay.
tcp_stream(addr: &str) -> io::Result<TcpStream>55 async fn tcp_stream(addr: &str) -> io::Result<TcpStream> {
56     // Here `addr` must contain a value if `to_socket_addrs` return `Ok`.
57     let addr = addr.to_socket_addrs()?.next().unwrap();
58 
59     #[cfg(feature = "tokio_base")]
60     {
61         TcpStream::connect(addr)
62             .await
63             .and_then(|stream| match stream.set_nodelay(true) {
64                 Ok(()) => Ok(stream),
65                 Err(e) => Err(e),
66             })
67     }
68 
69     #[cfg(feature = "ylong_base")]
70     TcpStream::connect(addr).await
71 }
72 
73 #[cfg(not(feature = "__tls"))]
74 mod no_tls {
75     use core::future::Future;
76     use core::pin::Pin;
77     use std::io::Error;
78 
79     use super::{tcp_stream, Connector, HttpConnector};
80     use crate::{TcpStream, Uri};
81 
82     impl Connector for HttpConnector {
83         type Stream = TcpStream;
84         type Error = Error;
85         type Future =
86             Pin<Box<dyn Future<Output = Result<Self::Stream, Self::Error>> + Sync + Send>>;
87 
connect(&self, uri: &Uri) -> Self::Future88         fn connect(&self, uri: &Uri) -> Self::Future {
89             // Checks if this uri need be proxied.
90             let addr = self
91                 .config
92                 .proxies
93                 .match_proxy(uri)
94                 .map(|proxy| proxy.via_proxy(uri).authority().unwrap().to_string())
95                 .unwrap_or(uri.authority().unwrap().to_string());
96 
97             Box::pin(async move { tcp_stream(&addr).await })
98         }
99     }
100 }
101 
102 #[cfg(feature = "__tls")]
103 mod tls {
104     use core::future::Future;
105     use core::pin::Pin;
106     use std::io::{Error, ErrorKind, Write};
107 
108     use super::{tcp_stream, Connector, HttpConnector};
109     use crate::async_impl::ssl_stream::{AsyncSslStream, MixStream};
110     use crate::error::CauseMessage;
111     use crate::{AsyncReadExt, AsyncWriteExt, Scheme, TcpStream, Uri};
112 
113     impl Connector for HttpConnector {
114         type Stream = MixStream<TcpStream>;
115         type Error = Error;
116         type Future =
117             Pin<Box<dyn Future<Output = Result<Self::Stream, Self::Error>> + Sync + Send>>;
118 
connect(&self, uri: &Uri) -> Self::Future119         fn connect(&self, uri: &Uri) -> Self::Future {
120             // Make sure all parts of uri is accurate.
121             let mut addr = uri.authority().unwrap().to_string();
122             let host = uri.host().unwrap().as_str().to_string();
123             let port = uri.port().unwrap().as_u16().unwrap();
124             let mut auth = None;
125             let mut is_proxy = false;
126 
127             if let Some(proxy) = self.config.proxies.match_proxy(uri) {
128                 addr = proxy.via_proxy(uri).authority().unwrap().to_string();
129                 auth = proxy
130                     .intercept
131                     .proxy_info()
132                     .basic_auth
133                     .as_ref()
134                     .and_then(|v| v.to_str().ok());
135                 is_proxy = true;
136             }
137 
138             let host_name = uri
139                 .host()
140                 .map(|host| host.to_string())
141                 .unwrap_or_else(|| "no host in uri".to_string());
142 
143             match *uri.scheme().unwrap() {
144                 Scheme::HTTP => {
145                     Box::pin(async move { Ok(MixStream::Http(tcp_stream(&addr).await?)) })
146                 }
147                 Scheme::HTTPS => {
148                     let config = self.config.tls.clone();
149                     Box::pin(async move {
150                         let mut tcp = tcp_stream(&addr).await?;
151 
152                         if is_proxy {
153                             tcp = tunnel(tcp, host, port, auth).await?;
154                         };
155 
156                         let mut stream = config
157                             .ssl_new(&host_name)
158                             .and_then(|ssl| AsyncSslStream::new(ssl.into_inner(), tcp))
159                             .map_err(|e| Error::new(ErrorKind::Other, e))?;
160 
161                         Pin::new(&mut stream)
162                             .connect()
163                             .await
164                             .map_err(|e| Error::new(ErrorKind::Other, e))?;
165                         Ok(MixStream::Https(stream))
166                     })
167                 }
168             }
169         }
170     }
171 
tunnel( mut conn: TcpStream, host: String, port: u16, auth: Option<String>, ) -> Result<TcpStream, Error>172     async fn tunnel(
173         mut conn: TcpStream,
174         host: String,
175         port: u16,
176         auth: Option<String>,
177     ) -> Result<TcpStream, Error> {
178         let mut req = Vec::new();
179 
180         write!(
181             &mut req,
182             "CONNECT {host}:{port} HTTP/1.1\r\nHost: {host}:{port}\r\n"
183         )?;
184 
185         if let Some(value) = auth {
186             write!(&mut req, "Proxy-Authorization: Basic {value}\r\n")?;
187         }
188 
189         write!(&mut req, "\r\n")?;
190 
191         conn.write_all(&req).await?;
192 
193         let mut buf = [0; 8192];
194         let mut pos = 0;
195 
196         loop {
197             let n = conn.read(&mut buf[pos..]).await?;
198 
199             if n == 0 {
200                 return Err(other_io_error("error receiving from proxy"));
201             }
202 
203             pos += n;
204             let resp = &buf[..pos];
205             if resp.starts_with(b"HTTP/1.1 200") {
206                 if resp.ends_with(b"\r\n\r\n") {
207                     return Ok(conn);
208                 }
209                 if pos == buf.len() {
210                     return Err(other_io_error("proxy headers too long for tunnel"));
211                 }
212             } else if resp.starts_with(b"HTTP/1.1 407") {
213                 return Err(other_io_error("proxy authentication required"));
214             } else {
215                 return Err(other_io_error("unsuccessful tunnel"));
216             }
217         }
218     }
219 
other_io_error(msg: &str) -> Error220     fn other_io_error(msg: &str) -> Error {
221         Error::new(ErrorKind::Other, CauseMessage::new(msg))
222     }
223 }
224