1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! Asynchronous `Connector` trait and `HttpConnector` implementation.
15
16 use core::future::Future;
17 use std::error::Error;
18 use std::io;
19 use std::net::ToSocketAddrs;
20
21 use crate::util::ConnectorConfig;
22 use crate::{AsyncRead, AsyncWrite, TcpStream, Uri};
23
24 /// `Connector` trait used by `async_impl::Client`. `Connector` provides
25 /// asynchronous connection establishment interfaces.
26 pub trait Connector {
27 /// Streams that this connector produces.
28 type Stream: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static;
29 /// Possible errors that this connector may generate when attempting to
30 /// connect.
31 type Error: Into<Box<dyn Error + Sync + Send>>;
32 /// Futures generated by this connector when attempting to create a stream.
33 type Future: Future<Output = Result<Self::Stream, Self::Error>> + Unpin + Sync + Send + 'static;
34
35 /// Attempts to establish a connection.
connect(&self, uri: &Uri) -> Self::Future36 fn connect(&self, uri: &Uri) -> Self::Future;
37 }
38
39 /// Connector for creating HTTP or HTTPS connections asynchronously.
40 ///
41 /// `HttpConnector` implements `async_impl::Connector` trait.
42 #[derive(Default)]
43 pub struct HttpConnector {
44 config: ConnectorConfig,
45 }
46
47 impl HttpConnector {
48 /// Creates a new `HttpConnector` with a `ConnectorConfig`.
new(config: ConnectorConfig) -> HttpConnector49 pub(crate) fn new(config: ConnectorConfig) -> HttpConnector {
50 HttpConnector { config }
51 }
52 }
53
54 // TODO: Fix this function after `ylong_runtime::TcpStream` support set_nodelay.
tcp_stream(addr: &str) -> io::Result<TcpStream>55 async fn tcp_stream(addr: &str) -> io::Result<TcpStream> {
56 // Here `addr` must contain a value if `to_socket_addrs` return `Ok`.
57 let addr = addr.to_socket_addrs()?.next().unwrap();
58
59 #[cfg(feature = "tokio_base")]
60 {
61 TcpStream::connect(addr)
62 .await
63 .and_then(|stream| match stream.set_nodelay(true) {
64 Ok(()) => Ok(stream),
65 Err(e) => Err(e),
66 })
67 }
68
69 #[cfg(feature = "ylong_base")]
70 TcpStream::connect(addr).await
71 }
72
73 #[cfg(not(feature = "__tls"))]
74 mod no_tls {
75 use core::future::Future;
76 use core::pin::Pin;
77 use std::io::Error;
78
79 use super::{tcp_stream, Connector, HttpConnector};
80 use crate::{TcpStream, Uri};
81
82 impl Connector for HttpConnector {
83 type Stream = TcpStream;
84 type Error = Error;
85 type Future =
86 Pin<Box<dyn Future<Output = Result<Self::Stream, Self::Error>> + Sync + Send>>;
87
connect(&self, uri: &Uri) -> Self::Future88 fn connect(&self, uri: &Uri) -> Self::Future {
89 // Checks if this uri need be proxied.
90 let addr = self
91 .config
92 .proxies
93 .match_proxy(uri)
94 .map(|proxy| proxy.via_proxy(uri).authority().unwrap().to_string())
95 .unwrap_or(uri.authority().unwrap().to_string());
96
97 Box::pin(async move { tcp_stream(&addr).await })
98 }
99 }
100 }
101
102 #[cfg(feature = "__tls")]
103 mod tls {
104 use core::future::Future;
105 use core::pin::Pin;
106 use std::io::{Error, ErrorKind, Write};
107
108 use super::{tcp_stream, Connector, HttpConnector};
109 use crate::async_impl::ssl_stream::{AsyncSslStream, MixStream};
110 use crate::error::CauseMessage;
111 use crate::{AsyncReadExt, AsyncWriteExt, Scheme, TcpStream, Uri};
112
113 impl Connector for HttpConnector {
114 type Stream = MixStream<TcpStream>;
115 type Error = Error;
116 type Future =
117 Pin<Box<dyn Future<Output = Result<Self::Stream, Self::Error>> + Sync + Send>>;
118
connect(&self, uri: &Uri) -> Self::Future119 fn connect(&self, uri: &Uri) -> Self::Future {
120 // Make sure all parts of uri is accurate.
121 let mut addr = uri.authority().unwrap().to_string();
122 let host = uri.host().unwrap().as_str().to_string();
123 let port = uri.port().unwrap().as_u16().unwrap();
124 let mut auth = None;
125 let mut is_proxy = false;
126
127 if let Some(proxy) = self.config.proxies.match_proxy(uri) {
128 addr = proxy.via_proxy(uri).authority().unwrap().to_string();
129 auth = proxy
130 .intercept
131 .proxy_info()
132 .basic_auth
133 .as_ref()
134 .and_then(|v| v.to_str().ok());
135 is_proxy = true;
136 }
137
138 let host_name = uri
139 .host()
140 .map(|host| host.to_string())
141 .unwrap_or_else(|| "no host in uri".to_string());
142
143 match *uri.scheme().unwrap() {
144 Scheme::HTTP => {
145 Box::pin(async move { Ok(MixStream::Http(tcp_stream(&addr).await?)) })
146 }
147 Scheme::HTTPS => {
148 let config = self.config.tls.clone();
149 Box::pin(async move {
150 let mut tcp = tcp_stream(&addr).await?;
151
152 if is_proxy {
153 tcp = tunnel(tcp, host, port, auth).await?;
154 };
155
156 let mut stream = config
157 .ssl_new(&host_name)
158 .and_then(|ssl| AsyncSslStream::new(ssl.into_inner(), tcp))
159 .map_err(|e| Error::new(ErrorKind::Other, e))?;
160
161 Pin::new(&mut stream)
162 .connect()
163 .await
164 .map_err(|e| Error::new(ErrorKind::Other, e))?;
165 Ok(MixStream::Https(stream))
166 })
167 }
168 }
169 }
170 }
171
tunnel( mut conn: TcpStream, host: String, port: u16, auth: Option<String>, ) -> Result<TcpStream, Error>172 async fn tunnel(
173 mut conn: TcpStream,
174 host: String,
175 port: u16,
176 auth: Option<String>,
177 ) -> Result<TcpStream, Error> {
178 let mut req = Vec::new();
179
180 write!(
181 &mut req,
182 "CONNECT {host}:{port} HTTP/1.1\r\nHost: {host}:{port}\r\n"
183 )?;
184
185 if let Some(value) = auth {
186 write!(&mut req, "Proxy-Authorization: Basic {value}\r\n")?;
187 }
188
189 write!(&mut req, "\r\n")?;
190
191 conn.write_all(&req).await?;
192
193 let mut buf = [0; 8192];
194 let mut pos = 0;
195
196 loop {
197 let n = conn.read(&mut buf[pos..]).await?;
198
199 if n == 0 {
200 return Err(other_io_error("error receiving from proxy"));
201 }
202
203 pos += n;
204 let resp = &buf[..pos];
205 if resp.starts_with(b"HTTP/1.1 200") {
206 if resp.ends_with(b"\r\n\r\n") {
207 return Ok(conn);
208 }
209 if pos == buf.len() {
210 return Err(other_io_error("proxy headers too long for tunnel"));
211 }
212 } else if resp.starts_with(b"HTTP/1.1 407") {
213 return Err(other_io_error("proxy authentication required"));
214 } else {
215 return Err(other_io_error("unsuccessful tunnel"));
216 }
217 }
218 }
219
other_io_error(msg: &str) -> Error220 fn other_io_error(msg: &str) -> Error {
221 Error::new(ErrorKind::Other, CauseMessage::new(msg))
222 }
223 }
224