1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! Asynchronous `Connector` trait and `HttpConnector` implementation.
15
16 mod stream;
17
18 use core::future::Future;
19 use std::io::{Error, ErrorKind};
20 use std::net::SocketAddr;
21 use std::str::FromStr;
22 use std::sync::Arc;
23
24 use ylong_http::request::uri::Uri;
25 #[cfg(feature = "http3")]
26 use ylong_runtime::net::{ConnectedUdpSocket, UdpSocket};
27
28 use crate::async_impl::dns::{DefaultDnsResolver, EyeBallConfig, HappyEyeballs, Resolver};
29 use crate::runtime::{AsyncRead, AsyncWrite, TcpStream};
30 use crate::util::config::{ConnectorConfig, HttpVersion};
31 /// Information of an IO.
32 use crate::util::ConnInfo;
33 use crate::{HttpClientError, Timeout};
34
35 /// `Connector` trait used by `async_impl::Client`. `Connector` provides
36 /// asynchronous connection establishment interfaces.
37 pub trait Connector {
38 /// Streams that this connector produces.
39 type Stream: AsyncRead + AsyncWrite + ConnInfo + Unpin + Sync + Send + 'static;
40
41 /// Futures generated by this connector when attempting to create a stream.
42 type Future: Future<Output = Result<Self::Stream, HttpClientError>>
43 + Unpin
44 + Sync
45 + Send
46 + 'static;
47
48 /// Attempts to establish a connection.
connect(&self, uri: &Uri, http_version: HttpVersion) -> Self::Future49 fn connect(&self, uri: &Uri, http_version: HttpVersion) -> Self::Future;
50 }
51
52 /// Connector for creating HTTP or HTTPS connections asynchronously.
53 ///
54 /// `HttpConnector` implements `async_impl::Connector` trait.
55 pub struct HttpConnector {
56 config: ConnectorConfig,
57 resolver: Arc<dyn Resolver>,
58 }
59
60 impl HttpConnector {
61 /// Creates a new `HttpConnector` with a `ConnectorConfig`.
new(config: ConnectorConfig, resolver: Arc<dyn Resolver>) -> Self62 pub(crate) fn new(config: ConnectorConfig, resolver: Arc<dyn Resolver>) -> Self {
63 Self { config, resolver }
64 }
65
66 /// Creates a new `HttpConnector` with a given dns `Resolver`.
with_dns_resolver<R>(resolver: R) -> Self where R: Resolver,67 pub(crate) fn with_dns_resolver<R>(resolver: R) -> Self
68 where
69 R: Resolver,
70 {
71 let resolver = Arc::new(resolver) as Arc<dyn Resolver>;
72 Self {
73 config: Default::default(),
74 resolver,
75 }
76 }
77 }
78
79 impl Default for HttpConnector {
default() -> Self80 fn default() -> Self {
81 Self {
82 config: Default::default(),
83 resolver: Arc::new(DefaultDnsResolver::default()),
84 }
85 }
86 }
87
tcp_stream(eyeballs: HappyEyeballs) -> Result<TcpStream, HttpClientError>88 async fn tcp_stream(eyeballs: HappyEyeballs) -> Result<TcpStream, HttpClientError> {
89 eyeballs
90 .connect()
91 .await
92 .map_err(|e| {
93 #[cfg(target_os = "linux")]
94 if format!("{}", e).contains("failed to lookup address information") {
95 return HttpClientError::from_dns_error(crate::ErrorKind::Connect, e);
96 }
97 #[cfg(target_os = "windows")]
98 if let Some(code) = e.raw_os_error() {
99 if (0x2329..=0x26B2).contains(&code) || code == 0x2AF9 {
100 return HttpClientError::from_dns_error(crate::ErrorKind::Connect, e);
101 }
102 }
103 HttpClientError::from_io_error(crate::ErrorKind::Connect, e)
104 })
105 .and_then(|stream| match stream.set_nodelay(true) {
106 Ok(()) => Ok(stream),
107 Err(e) => err_from_io!(Connect, e),
108 })
109 }
110
dns_query( resolver: Arc<dyn Resolver>, addr: &str, ) -> Result<Vec<SocketAddr>, HttpClientError>111 async fn dns_query(
112 resolver: Arc<dyn Resolver>,
113 addr: &str,
114 ) -> Result<Vec<SocketAddr>, HttpClientError> {
115 if let Ok(socket_addr) = SocketAddr::from_str(addr) {
116 return Ok(vec![socket_addr]);
117 }
118 let addr_fut = resolver.resolve(addr);
119 let socket_addr = addr_fut.await.map_err(|e| {
120 HttpClientError::from_dns_error(
121 crate::ErrorKind::Connect,
122 Error::new(ErrorKind::Interrupted, e),
123 )
124 })?;
125 Ok(socket_addr.collect::<Vec<_>>())
126 }
127
eyeballs_connect( addrs: Vec<SocketAddr>, timeout: Timeout, ) -> Result<TcpStream, HttpClientError>128 async fn eyeballs_connect(
129 addrs: Vec<SocketAddr>,
130 timeout: Timeout,
131 ) -> Result<TcpStream, HttpClientError> {
132 let eyeball_config = EyeBallConfig::new(timeout.inner(), None);
133 let happy_eyeballs = HappyEyeballs::new(addrs, eyeball_config);
134 tcp_stream(happy_eyeballs).await
135 }
136
137 #[cfg(feature = "http3")]
udp_stream( addr: &std::net::SocketAddr, ) -> Result<ConnectedUdpSocket, HttpClientError>138 pub(crate) async fn udp_stream(
139 addr: &std::net::SocketAddr,
140 ) -> Result<ConnectedUdpSocket, HttpClientError> {
141 let local_addr = match addr {
142 std::net::SocketAddr::V4(_) => "0.0.0.0:0",
143 std::net::SocketAddr::V6(_) => "[::]:0",
144 };
145 let sock = UdpSocket::bind(local_addr)
146 .await
147 .map_err(|e| HttpClientError::from_io_error(crate::ErrorKind::Connect, e))?;
148 sock.connect(addr)
149 .await
150 .map_err(|e| HttpClientError::from_io_error(crate::ErrorKind::Connect, e))
151 }
152
153 #[cfg(not(feature = "__tls"))]
154 mod no_tls {
155 use core::future::Future;
156 use core::pin::Pin;
157 use std::time::Instant;
158
159 use ylong_http::request::uri::Uri;
160
161 use super::{eyeballs_connect, Connector, HttpConnector};
162 use crate::async_impl::connector::dns_query;
163 use crate::async_impl::connector::stream::HttpStream;
164 use crate::runtime::TcpStream;
165 use crate::util::config::HttpVersion;
166 use crate::util::interceptor::ConnProtocol;
167 use crate::{ConnData, ConnDetail, HttpClientError, TimeGroup};
168
169 impl Connector for HttpConnector {
170 type Stream = HttpStream<TcpStream>;
171 type Future =
172 Pin<Box<dyn Future<Output = Result<Self::Stream, HttpClientError>> + Sync + Send>>;
173
connect(&self, uri: &Uri, _http_version: HttpVersion) -> Self::Future174 fn connect(&self, uri: &Uri, _http_version: HttpVersion) -> Self::Future {
175 // Checks if this uri need be proxied.
176 let mut is_proxy = false;
177 let mut addr = uri.authority().unwrap().to_string();
178 if let Some(proxy) = self.config.proxies.match_proxy(uri) {
179 addr = proxy.via_proxy(uri).authority().unwrap().to_string();
180 is_proxy = true;
181 }
182
183 let resolver = self.resolver.clone();
184 let timeout = self.config.timeout.clone();
185 Box::pin(async move {
186 let mut time_group = TimeGroup::default();
187 time_group.set_dns_start(Instant::now());
188 let socket_addrs = dns_query(resolver, addr.as_str()).await?;
189 time_group.set_dns_end(Instant::now());
190 time_group.set_tcp_start(Instant::now());
191 let stream = eyeballs_connect(socket_addrs, timeout).await?;
192 time_group.set_tcp_end(Instant::now());
193 let local = stream
194 .local_addr()
195 .map_err(|e| HttpClientError::from_io_error(crate::ErrorKind::Connect, e))?;
196 let peer = stream
197 .peer_addr()
198 .map_err(|e| HttpClientError::from_io_error(crate::ErrorKind::Connect, e))?;
199 let detail = ConnDetail {
200 protocol: ConnProtocol::Tcp,
201 local,
202 peer,
203 addr,
204 };
205
206 let data = ConnData::builder()
207 .time_group(time_group)
208 .proxy(is_proxy)
209 .build(detail);
210 Ok(HttpStream::new(stream, data))
211 })
212 }
213 }
214 }
215
216 #[cfg(feature = "__tls")]
217 mod tls {
218 use core::future::Future;
219 use core::pin::Pin;
220 use std::error;
221 use std::fmt::{Debug, Display, Formatter};
222 use std::io::{Error, ErrorKind, Write};
223 use std::time::Instant;
224
225 use ylong_http::request::uri::{Scheme, Uri};
226
227 use super::{eyeballs_connect, Connector, HttpConnector};
228 use crate::async_impl::connector::dns_query;
229 use crate::async_impl::connector::stream::HttpStream;
230 use crate::async_impl::mix::MixStream;
231 #[cfg(feature = "http3")]
232 use crate::async_impl::quic::QuicConn;
233 use crate::async_impl::ssl_stream::AsyncSslStream;
234 #[cfg(all(target_os = "linux", feature = "ylong_base", feature = "__tls"))]
235 use crate::config::FchownConfig;
236 use crate::runtime::{AsyncReadExt, AsyncWriteExt, TcpStream};
237 use crate::util::config::HttpVersion;
238 #[cfg(feature = "http2")]
239 use crate::util::information::NegotiateInfo;
240 use crate::util::interceptor::ConnProtocol;
241 use crate::{ConnData, ConnDetail, HttpClientError, TimeGroup, TlsConfig};
242
243 impl Connector for HttpConnector {
244 type Stream = HttpStream<MixStream>;
245 type Future =
246 Pin<Box<dyn Future<Output = Result<Self::Stream, HttpClientError>> + Sync + Send>>;
247
connect(&self, uri: &Uri, _http_version: HttpVersion) -> Self::Future248 fn connect(&self, uri: &Uri, _http_version: HttpVersion) -> Self::Future {
249 // Make sure all parts of uri is accurate.
250 let mut addr = uri.authority().unwrap().to_string();
251 let mut auth = None;
252 let mut is_proxy = false;
253
254 if let Some(proxy) = self.config.proxies.match_proxy(uri) {
255 addr = proxy.via_proxy(uri).authority().unwrap().to_string();
256 auth = proxy
257 .intercept
258 .proxy_info()
259 .basic_auth
260 .as_ref()
261 .and_then(|v| v.to_string().ok());
262 is_proxy = true;
263 }
264 #[cfg(all(target_os = "linux", feature = "ylong_base", feature = "__tls"))]
265 let fchown = self.config.fchown.clone();
266 let resolver = self.resolver.clone();
267 let timeout = self.config.timeout.clone();
268 match *uri.scheme().unwrap() {
269 Scheme::HTTP => Box::pin(async move {
270 let mut time_group = TimeGroup::default();
271 time_group.set_dns_start(Instant::now());
272 let socket_addrs = dns_query(resolver, addr.as_str()).await?;
273 time_group.set_dns_end(Instant::now());
274 time_group.set_tcp_start(Instant::now());
275 let stream = eyeballs_connect(socket_addrs, timeout).await?;
276 time_group.set_tcp_end(Instant::now());
277 #[cfg(all(target_os = "linux", feature = "ylong_base", feature = "__tls"))]
278 if let Some(fchown) = fchown {
279 let _ = stream.fchown(fchown.uid, fchown.gid);
280 }
281
282 let local = stream.local_addr().map_err(|e| {
283 HttpClientError::from_io_error(crate::ErrorKind::Connect, e)
284 })?;
285 let peer = stream.peer_addr().map_err(|e| {
286 HttpClientError::from_io_error(crate::ErrorKind::Connect, e)
287 })?;
288 let detail = ConnDetail {
289 protocol: ConnProtocol::Tcp,
290 local,
291 peer,
292 addr,
293 };
294 let data = ConnData::builder()
295 .time_group(time_group)
296 .proxy(is_proxy)
297 .build(detail);
298
299 Ok(HttpStream::new(MixStream::Http(stream), data))
300 }),
301 Scheme::HTTPS => {
302 let host = uri.host().unwrap().to_string();
303 let port = uri.port().unwrap().as_u16().unwrap();
304 let config = self.config.tls.clone();
305 #[cfg(feature = "http3")]
306 if _http_version == HttpVersion::Http3 {
307 return Box::pin(async move {
308 let mut time_group = TimeGroup::default();
309 time_group.set_dns_start(Instant::now());
310 let addr_fut = resolver.resolve(&addr);
311 let addrs = addr_fut.await.map_err(|e| {
312 HttpClientError::from_dns_error(
313 crate::ErrorKind::Connect,
314 Error::new(ErrorKind::Interrupted, e),
315 )
316 })?;
317 time_group.set_dns_end(Instant::now());
318 time_group.set_quic_start(Instant::now());
319 let mut last_e = None;
320 for addr_it in addrs {
321 let udp_socket = match super::udp_stream(&addr_it).await {
322 Ok(socket) => socket,
323 Err(e) => {
324 last_e = Some(e);
325 continue;
326 }
327 };
328 let local = udp_socket.local_addr().map_err(|e| {
329 HttpClientError::from_io_error(crate::ErrorKind::Connect, e)
330 })?;
331 let peer = udp_socket.peer_addr().map_err(|e| {
332 HttpClientError::from_io_error(crate::ErrorKind::Connect, e)
333 })?;
334 let detail = ConnDetail {
335 protocol: ConnProtocol::Quic,
336 local,
337 peer,
338 addr: addr.clone(),
339 };
340
341 let mut data = ConnData::builder()
342 .time_group(time_group.clone())
343 .proxy(is_proxy)
344 .build(detail);
345
346 let mut stream =
347 HttpStream::new(MixStream::Udp(udp_socket), data.clone());
348 let Ok(quic_conn) =
349 QuicConn::connect(&mut stream, &config, &host).await
350 else {
351 continue;
352 };
353 stream.set_quic_conn(quic_conn);
354 data.time_group_mut().set_quic_end(Instant::now());
355 stream.set_conn_data(data);
356 return Ok(stream);
357 }
358
359 Err(last_e.unwrap_or(HttpClientError::from_str(
360 crate::ErrorKind::Connect,
361 "connect failed",
362 )))
363 });
364 }
365 Box::pin(async move {
366 let mut time_group = TimeGroup::default();
367 time_group.set_dns_start(Instant::now());
368 let socket_addrs = dns_query(resolver, addr.as_str()).await?;
369 time_group.set_dns_end(Instant::now());
370 time_group.set_tcp_start(Instant::now());
371 let stream = eyeballs_connect(socket_addrs, timeout).await?;
372 time_group.set_tcp_end(Instant::now());
373 #[cfg(all(target_os = "linux", feature = "ylong_base", feature = "__tls"))]
374 {
375 https_connect(
376 config,
377 addr,
378 stream,
379 is_proxy,
380 (auth, host, port),
381 fchown,
382 time_group,
383 )
384 .await
385 }
386 #[cfg(not(all(
387 target_os = "linux",
388 feature = "ylong_base",
389 feature = "__tls"
390 )))]
391 {
392 https_connect(
393 config,
394 addr,
395 stream,
396 is_proxy,
397 (auth, host, port),
398 time_group,
399 )
400 .await
401 }
402 })
403 }
404 }
405 }
406 }
407
https_connect( config: TlsConfig, addr: String, tcp_stream: TcpStream, is_proxy: bool, (auth, host, port): (Option<String>, String, u16), #[cfg(all(target_os = "linux", feature = "ylong_base", feature = "__tls"))] fchown: Option< FchownConfig, >, mut time_group: TimeGroup, ) -> Result<HttpStream<MixStream>, HttpClientError>408 async fn https_connect(
409 config: TlsConfig,
410 addr: String,
411 tcp_stream: TcpStream,
412 is_proxy: bool,
413 (auth, host, port): (Option<String>, String, u16),
414 #[cfg(all(target_os = "linux", feature = "ylong_base", feature = "__tls"))] fchown: Option<
415 FchownConfig,
416 >,
417 mut time_group: TimeGroup,
418 ) -> Result<HttpStream<MixStream>, HttpClientError> {
419 let mut tcp = tcp_stream;
420 #[cfg(all(target_os = "linux", feature = "ylong_base", feature = "__tls"))]
421 if let Some(fchown) = fchown {
422 let _ = tcp.fchown(fchown.uid, fchown.gid);
423 }
424 let local = tcp
425 .local_addr()
426 .map_err(|e| HttpClientError::from_io_error(crate::ErrorKind::Connect, e))?;
427 let peer = tcp
428 .peer_addr()
429 .map_err(|e| HttpClientError::from_io_error(crate::ErrorKind::Connect, e))?;
430 if is_proxy {
431 tcp = tunnel(tcp, &host, port, auth)
432 .await
433 .map_err(|e| HttpClientError::from_io_error(crate::ErrorKind::Connect, e))?;
434 };
435
436 let pinned_key = config.pinning_host_match(addr.as_str());
437 let mut stream = config
438 .ssl_new(&host)
439 .and_then(|ssl| AsyncSslStream::new(ssl.into_inner(), tcp, pinned_key))
440 .map_err(|e| {
441 HttpClientError::from_tls_error(
442 crate::ErrorKind::Connect,
443 Error::new(ErrorKind::Other, e),
444 )
445 })?;
446
447 time_group.set_tls_start(Instant::now());
448 Pin::new(&mut stream).connect().await.map_err(|e| {
449 HttpClientError::from_tls_error(
450 crate::ErrorKind::Connect,
451 Error::new(ErrorKind::Other, e),
452 )
453 })?;
454 time_group.set_tls_end(Instant::now());
455
456 #[cfg(feature = "http2")]
457 let alpn = stream.negotiated_alpn_protocol().map(Vec::from);
458 let detail = ConnDetail {
459 protocol: ConnProtocol::Tcp,
460 local,
461 peer,
462 addr,
463 };
464
465 #[cfg(feature = "http2")]
466 let data = ConnData::builder()
467 .time_group(time_group)
468 .proxy(is_proxy)
469 .negotiate(NegotiateInfo::from_alpn(alpn))
470 .build(detail);
471
472 #[cfg(not(feature = "http2"))]
473 let data = ConnData::builder()
474 .time_group(time_group)
475 .proxy(is_proxy)
476 .build(detail);
477
478 Ok(HttpStream::new(MixStream::Https(stream), data))
479 }
480
tunnel( mut conn: TcpStream, host: &str, port: u16, auth: Option<String>, ) -> Result<TcpStream, Error>481 async fn tunnel(
482 mut conn: TcpStream,
483 host: &str,
484 port: u16,
485 auth: Option<String>,
486 ) -> Result<TcpStream, Error> {
487 let mut req = Vec::new();
488
489 write!(
490 &mut req,
491 "CONNECT {host}:{port} HTTP/1.1\r\nHost: {host}:{port}\r\n"
492 )?;
493
494 if let Some(value) = auth {
495 write!(&mut req, "Proxy-Authorization: Basic {value}\r\n")?;
496 }
497
498 write!(&mut req, "\r\n")?;
499
500 conn.write_all(&req).await?;
501
502 let mut buf = [0; 8192];
503 let mut pos = 0;
504
505 loop {
506 let n = conn.read(&mut buf[pos..]).await?;
507
508 if n == 0 {
509 return Err(other_io_error(CreateTunnelErr::Unsuccessful));
510 }
511
512 pos += n;
513 let resp = &buf[..pos];
514 if resp.starts_with(b"HTTP/1.1 200") || resp.starts_with(b"HTTP/1.0 200") {
515 if resp.ends_with(b"\r\n\r\n") {
516 return Ok(conn);
517 }
518 if pos == buf.len() {
519 return Err(other_io_error(CreateTunnelErr::ProxyHeadersTooLong));
520 }
521 } else if resp.starts_with(b"HTTP/1.1 407") {
522 return Err(other_io_error(CreateTunnelErr::ProxyAuthenticationRequired));
523 } else {
524 return Err(other_io_error(CreateTunnelErr::Unsuccessful));
525 }
526 }
527 }
528
other_io_error(err: CreateTunnelErr) -> Error529 fn other_io_error(err: CreateTunnelErr) -> Error {
530 Error::new(ErrorKind::Other, err)
531 }
532
533 enum CreateTunnelErr {
534 ProxyHeadersTooLong,
535 ProxyAuthenticationRequired,
536 Unsuccessful,
537 }
538
539 impl Debug for CreateTunnelErr {
fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result540 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
541 match self {
542 Self::ProxyHeadersTooLong => f.write_str("Proxy headers too long for tunnel"),
543 Self::ProxyAuthenticationRequired => f.write_str("Proxy authentication required"),
544 Self::Unsuccessful => f.write_str("Unsuccessful tunnel"),
545 }
546 }
547 }
548
549 impl Display for CreateTunnelErr {
fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result550 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
551 Debug::fmt(self, f)
552 }
553 }
554
555 impl error::Error for CreateTunnelErr {}
556
557 #[cfg(all(test, feature = "__tls"))]
558 mod ut_tunnel_error_debug {
559 use crate::async_impl::connector::tls::CreateTunnelErr;
560
561 /// UT test cases for debug of`CreateTunnelErr`.
562 ///
563 /// # Brief
564 /// 1. Checks `CreateTunnelErr` debug by calling `CreateTunnelErr::fmt`.
565 /// 2. Checks if the result is as expected.
566 #[test]
ut_tunnel_error_debug_assert()567 fn ut_tunnel_error_debug_assert() {
568 assert_eq!(
569 format!("{:?}", CreateTunnelErr::ProxyHeadersTooLong),
570 "Proxy headers too long for tunnel"
571 );
572 assert_eq!(
573 format!("{:?}", CreateTunnelErr::ProxyAuthenticationRequired),
574 "Proxy authentication required"
575 );
576 assert_eq!(
577 format!("{:?}", CreateTunnelErr::Unsuccessful),
578 "Unsuccessful tunnel"
579 );
580 assert_eq!(
581 format!("{}", CreateTunnelErr::ProxyHeadersTooLong),
582 "Proxy headers too long for tunnel"
583 );
584 assert_eq!(
585 format!("{}", CreateTunnelErr::ProxyAuthenticationRequired),
586 "Proxy authentication required"
587 );
588 assert_eq!(
589 format!("{}", CreateTunnelErr::Unsuccessful),
590 "Unsuccessful tunnel"
591 );
592 }
593 }
594
595 #[cfg(all(test, feature = "__tls", feature = "ylong_base"))]
596 mod ut_create_tunnel_err_debug {
597 use std::net::SocketAddr;
598 use std::str::FromStr;
599
600 use ylong_runtime::io::AsyncWriteExt;
601
602 use crate::async_impl::connector::tcp_stream;
603 use crate::async_impl::connector::tls::{other_io_error, tunnel, CreateTunnelErr};
604 use crate::async_impl::dns::{EyeBallConfig, HappyEyeballs};
605 use crate::start_tcp_server;
606 use crate::util::test_utils::{format_header_str, TcpHandle};
607
608 /// UT test cases for `tunnel`.
609 ///
610 /// # Brief
611 /// 1. Creates a `tcp stream` by calling `tcp_stream`.
612 /// 2. Sends a `Request` by `tunnel`.
613 /// 3. Checks if the result is as expected.
614 #[test]
ut_ssl_tunnel_error()615 fn ut_ssl_tunnel_error() {
616 let mut handles = vec![];
617 start_tcp_server!(
618 Handles: handles,
619 EndWith: "\r\n\r\n",
620 Shutdown: std::net::Shutdown::Both,
621 );
622 let handle = handles.pop().expect("No more handles !");
623
624 let eyeballs = HappyEyeballs::new(
625 vec![SocketAddr::from_str(handle.addr.as_str()).unwrap()],
626 EyeBallConfig::new(None, None),
627 );
628
629 let handle = ylong_runtime::spawn(async move {
630 let tcp = tcp_stream(eyeballs).await.unwrap();
631 let res = tunnel(
632 tcp,
633 "ylong_http.com",
634 443,
635 Some(String::from("base64 bytes")),
636 )
637 .await;
638 assert_eq!(
639 format!("{:?}", res.err()),
640 format!("{:?}", Some(other_io_error(CreateTunnelErr::Unsuccessful)))
641 );
642 handle
643 .server_shutdown
644 .recv()
645 .expect("server send order failed !");
646 });
647 ylong_runtime::block_on(handle).unwrap();
648
649 start_tcp_server!(
650 Handles: handles,
651 EndWith: "\r\n\r\n",
652 Response: {
653 Status: 407,
654 Version: "HTTP/1.1",
655 Header: "Content-Length", "11",
656 Body: "METHOD GET!",
657 },
658 Shutdown: std::net::Shutdown::Both,
659 );
660 let handle = handles.pop().expect("No more handles !");
661
662 let eyeballs = HappyEyeballs::new(
663 vec![SocketAddr::from_str(handle.addr.as_str()).unwrap()],
664 EyeBallConfig::new(None, None),
665 );
666 let handle = ylong_runtime::spawn(async move {
667 let tcp = tcp_stream(eyeballs).await.unwrap();
668 let res = tunnel(
669 tcp,
670 "ylong_http.com",
671 443,
672 Some(String::from("base64 bytes")),
673 )
674 .await;
675 assert_eq!(
676 format!("{:?}", res.err()),
677 format!(
678 "{:?}",
679 Some(other_io_error(CreateTunnelErr::ProxyAuthenticationRequired))
680 )
681 );
682 handle
683 .server_shutdown
684 .recv()
685 .expect("server send order failed !");
686 });
687 ylong_runtime::block_on(handle).unwrap();
688
689 start_tcp_server!(
690 Handles: handles,
691 EndWith: "\r\n\r\n",
692 Response: {
693 Status: 402,
694 Version: "HTTP/1.1",
695 Header: "Content-Length", "11",
696 Body: "METHOD GET!",
697 },
698 Shutdown: std::net::Shutdown::Both,
699 );
700 let handle = handles.pop().expect("No more handles !");
701
702 let eyeballs = HappyEyeballs::new(
703 vec![SocketAddr::from_str(handle.addr.as_str()).unwrap()],
704 EyeBallConfig::new(None, None),
705 );
706 let handle = ylong_runtime::spawn(async move {
707 let tcp = tcp_stream(eyeballs).await.unwrap();
708 let res = tunnel(
709 tcp,
710 "ylong_http.com",
711 443,
712 Some(String::from("base64 bytes")),
713 )
714 .await;
715 assert_eq!(
716 format!("{:?}", res.err()),
717 format!("{:?}", Some(other_io_error(CreateTunnelErr::Unsuccessful)))
718 );
719 handle
720 .server_shutdown
721 .recv()
722 .expect("server send order failed !");
723 });
724 ylong_runtime::block_on(handle).unwrap();
725 }
726
727 /// UT test cases for `tunnel`.
728 ///
729 /// # Brief
730 /// 1. Creates a `tcp stream` by calling `tcp_stream`.
731 /// 2. Sends a `Request` by `tunnel`.
732 /// 3. Checks if the result is as expected.
733 #[test]
ut_ssl_tunnel_connect()734 fn ut_ssl_tunnel_connect() {
735 let mut handles = vec![];
736
737 start_tcp_server!(
738 Handles: handles,
739 EndWith: "\r\n\r\n",
740 Response: {
741 Status: 200,
742 Version: "HTTP/1.1",
743 Body: "",
744 },
745 Shutdown: std::net::Shutdown::Both,
746 );
747 let handle = handles.pop().expect("No more handles !");
748
749 let eyeballs = HappyEyeballs::new(
750 vec![SocketAddr::from_str(handle.addr.as_str()).unwrap()],
751 EyeBallConfig::new(None, None),
752 );
753 let handle = ylong_runtime::spawn(async move {
754 let tcp = tcp_stream(eyeballs).await.unwrap();
755 let res = tunnel(
756 tcp,
757 "ylong_http.com",
758 443,
759 Some(String::from("base64 bytes")),
760 )
761 .await;
762 assert!(res.is_ok());
763 handle
764 .server_shutdown
765 .recv()
766 .expect("server send order failed !");
767 });
768 ylong_runtime::block_on(handle).unwrap();
769 }
770
771 /// UT test cases for response beyond size of `tunnel`.
772 ///
773 /// # Brief
774 /// 1. Creates a `tcp stream` by calling `tcp_stream`.
775 /// 2. Sends a `Request` by `tunnel`.
776 /// 3. Checks if the result is as expected.
777 #[test]
ut_ssl_tunnel_resp_beyond_size()778 fn ut_ssl_tunnel_resp_beyond_size() {
779 let mut handles = vec![];
780
781 let buf = vec![b'b'; 8192];
782 let body = String::from_utf8(buf).unwrap();
783
784 start_tcp_server!(
785 Handles: handles,
786 EndWith: "\r\n\r\n",
787 Response: {
788 Status: 200,
789 Version: "HTTP/1.1",
790 Header: "Content-Length", "11",
791 Body: body.as_str(),
792 },
793 );
794 let handle = handles.pop().expect("No more handles !");
795
796 let eyeballs = HappyEyeballs::new(
797 vec![SocketAddr::from_str(handle.addr.as_str()).unwrap()],
798 EyeBallConfig::new(None, None),
799 );
800 let handle = ylong_runtime::spawn(async move {
801 let tcp = tcp_stream(eyeballs).await.unwrap();
802 let res = tunnel(
803 tcp,
804 "ylong_http.com",
805 443,
806 Some(String::from("base64 bytes")),
807 )
808 .await;
809 assert_eq!(
810 format!("{:?}", res.err()),
811 format!(
812 "{:?}",
813 Some(other_io_error(CreateTunnelErr::ProxyHeadersTooLong))
814 )
815 );
816 handle
817 .server_shutdown
818 .recv()
819 .expect("server send order failed !");
820 });
821 ylong_runtime::block_on(handle).unwrap();
822 }
823 }
824 }
825