• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Happy Eyeballs implementation.
15 
16 use core::time::Duration;
17 use std::future::Future;
18 use std::io;
19 use std::io::{Error, ErrorKind};
20 use std::net::SocketAddr;
21 use std::ops::{Deref, DerefMut};
22 
23 use crate::async_impl::dns::resolver::ResolvedAddrs;
24 use crate::runtime::{Sleep, TcpStream};
25 
26 const HAPPY_EYEBALLS_PREFERRED_TIMEOUT_MS: u64 = 300;
27 
28 pub(crate) struct HappyEyeballs {
29     preferred_addr: RemoteAddrs,
30     delay_addr: Option<DelayedAddrs>,
31 }
32 
33 struct RemoteAddrs {
34     addrs: DomainAddrs,
35     // timeout of each socket address.
36     timeout: Option<Duration>,
37 }
38 
39 struct DelayedAddrs {
40     addrs: RemoteAddrs,
41     delay: Sleep,
42 }
43 
44 pub(crate) struct EyeBallConfig {
45     // the timeout period for the entire connection establishment.
46     timeout: Option<Duration>,
47     // Delay to start other address family when the preferred address family is not complete
48     delay: Option<Duration>,
49 }
50 
51 struct DomainAddrs {
52     addrs: Vec<SocketAddr>,
53 }
54 
55 struct DomainAddrsIter<'a> {
56     iter: core::slice::Iter<'a, SocketAddr>,
57 }
58 
59 impl DomainAddrs {
new(addrs: Vec<SocketAddr>) -> Self60     pub(crate) fn new(addrs: Vec<SocketAddr>) -> Self {
61         Self { addrs }
62     }
63 
iter(&self) -> DomainAddrsIter64     pub(crate) fn iter(&self) -> DomainAddrsIter {
65         DomainAddrsIter {
66             iter: self.addrs.iter(),
67         }
68     }
69 }
70 
71 impl<'a> Iterator for DomainAddrsIter<'a> {
72     type Item = &'a SocketAddr;
73 
next(&mut self) -> Option<Self::Item>74     fn next(&mut self) -> Option<Self::Item> {
75         self.iter.next()
76     }
77 }
78 
79 impl<'a> Deref for DomainAddrsIter<'a> {
80     type Target = core::slice::Iter<'a, SocketAddr>;
81 
deref(&self) -> &Self::Target82     fn deref(&self) -> &Self::Target {
83         &self.iter
84     }
85 }
86 
87 impl<'a> DerefMut for DomainAddrsIter<'a> {
deref_mut(&mut self) -> &mut Self::Target88     fn deref_mut(&mut self) -> &mut Self::Target {
89         &mut self.iter
90     }
91 }
92 
93 impl DelayedAddrs {
new(addrs: RemoteAddrs, delay: Sleep) -> Self94     pub(crate) fn new(addrs: RemoteAddrs, delay: Sleep) -> Self {
95         DelayedAddrs { addrs, delay }
96     }
97 }
98 
99 impl EyeBallConfig {
new(timeout: Option<Duration>, delay: Option<Duration>) -> Self100     pub(crate) fn new(timeout: Option<Duration>, delay: Option<Duration>) -> Self {
101         Self { timeout, delay }
102     }
103 }
104 
105 impl RemoteAddrs {
new(addrs: Vec<SocketAddr>, timeout: Option<Duration>) -> Self106     fn new(addrs: Vec<SocketAddr>, timeout: Option<Duration>) -> Self {
107         Self {
108             addrs: DomainAddrs::new(addrs),
109             timeout,
110         }
111     }
112 
connect(&mut self) -> Result<TcpStream, io::Error>113     async fn connect(&mut self) -> Result<TcpStream, io::Error> {
114         let mut unexpected = None;
115         for addr in self.addrs.iter() {
116             match connect(addr, self.timeout).await {
117                 Ok(stream) => {
118                     return Ok(stream);
119                 }
120                 Err(e) => {
121                     unexpected = Some(e);
122                 }
123             }
124         }
125         match unexpected {
126             None => Err(Error::new(ErrorKind::NotConnected, "Invalid domain")),
127             Some(e) => Err(e),
128         }
129     }
130 }
131 
132 impl HappyEyeballs {
new(socket_addr: Vec<SocketAddr>, config: EyeBallConfig) -> Self133     pub(crate) fn new(socket_addr: Vec<SocketAddr>, config: EyeBallConfig) -> Self {
134         let socket_addr = ResolvedAddrs::new(socket_addr.into_iter());
135         // splits SocketAddrs into preferred and other family.
136         let (preferred, second) = socket_addr.split_preferred_addrs();
137         let preferred_size = preferred.len();
138         let second_size = second.len();
139         if second.is_empty() {
140             HappyEyeballs {
141                 preferred_addr: RemoteAddrs::new(
142                     preferred,
143                     config
144                         .timeout
145                         .and_then(|time| time.checked_div(preferred_size as u32)),
146                 ),
147                 delay_addr: None,
148             }
149         } else {
150             let delay = if let Some(delay) = config.delay {
151                 delay
152             } else {
153                 Duration::from_millis(HAPPY_EYEBALLS_PREFERRED_TIMEOUT_MS)
154             };
155             HappyEyeballs {
156                 preferred_addr: RemoteAddrs::new(
157                     preferred,
158                     config
159                         .timeout
160                         .and_then(|time| time.checked_div(preferred_size as u32)),
161                 ),
162                 // TODO Is it necessary to subtract the delay time
163                 delay_addr: Some(DelayedAddrs::new(
164                     RemoteAddrs::new(
165                         second,
166                         config
167                             .timeout
168                             .and_then(|time| time.checked_div(second_size as u32)),
169                     ),
170                     crate::runtime::sleep(delay),
171                 )),
172             }
173         }
174     }
175 
connect(mut self) -> io::Result<TcpStream>176     pub(crate) async fn connect(mut self) -> io::Result<TcpStream> {
177         match self.delay_addr {
178             None => self.preferred_addr.connect().await,
179             Some(mut second_addrs) => {
180                 let preferred_fut = self.preferred_addr.connect();
181                 let second_fut = second_addrs.addrs.connect();
182                 let delay_fut = second_addrs.delay;
183 
184                 #[cfg(feature = "ylong_base")]
185                 let (stream, stream_fut) = ylong_runtime::select! {
186                     preferred = preferred_fut => {
187                         (preferred, second_fut)
188                     },
189                     _ = delay_fut => {
190                         let preferred_fut = self.preferred_addr.connect();
191                         ylong_runtime::select! {
192                             preferred = preferred_fut => {
193                                 let second_fut = second_addrs.addrs.connect();
194                                 (preferred, second_fut)
195                             },
196                             second = second_fut => {
197                                 let preferred_fut = self.preferred_addr.connect();
198                                 (second, preferred_fut)
199                             },
200                         }
201                     },
202                 };
203 
204                 #[cfg(feature = "tokio_base")]
205                 let (stream, stream_fut) = tokio::select! {
206                     preferred = preferred_fut => {
207                         (preferred, second_fut)
208                     },
209                     _ = delay_fut => {
210                         let preferred_fut = self.preferred_addr.connect();
211                         tokio::select! {
212                             preferred = preferred_fut => {
213                                 let second_fut = second_addrs.addrs.connect();
214                                 (preferred, second_fut)
215                             },
216                             second = second_fut => {
217                                 let preferred_fut = self.preferred_addr.connect();
218                                 (second, preferred_fut)
219                             },
220                         }
221                     },
222                 };
223 
224                 if stream.is_err() {
225                     stream_fut.await
226                 } else {
227                     stream
228                 }
229             }
230         }
231     }
232 }
233 
connect( addr: &SocketAddr, timeout: Option<Duration>, ) -> impl Future<Output = io::Result<TcpStream>>234 fn connect(
235     addr: &SocketAddr,
236     timeout: Option<Duration>,
237 ) -> impl Future<Output = io::Result<TcpStream>> {
238     let stream_fut = TcpStream::connect(*addr);
239     async move {
240         match timeout {
241             None => stream_fut.await,
242             Some(duration) => match crate::runtime::timeout(duration, stream_fut).await {
243                 Ok(Ok(result)) => Ok(result),
244                 Ok(Err(e)) => Err(e),
245                 Err(e) => Err(io::Error::new(ErrorKind::TimedOut, e)),
246             },
247         }
248     }
249 }
250