1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 //! Happy Eyeballs implementation.
15
16 use core::time::Duration;
17 use std::future::Future;
18 use std::io;
19 use std::io::{Error, ErrorKind};
20 use std::net::SocketAddr;
21 use std::ops::{Deref, DerefMut};
22
23 use crate::async_impl::dns::resolver::ResolvedAddrs;
24 use crate::runtime::{Sleep, TcpStream};
25
26 const HAPPY_EYEBALLS_PREFERRED_TIMEOUT_MS: u64 = 300;
27
28 pub(crate) struct HappyEyeballs {
29 preferred_addr: RemoteAddrs,
30 delay_addr: Option<DelayedAddrs>,
31 }
32
33 struct RemoteAddrs {
34 addrs: DomainAddrs,
35 // timeout of each socket address.
36 timeout: Option<Duration>,
37 }
38
39 struct DelayedAddrs {
40 addrs: RemoteAddrs,
41 delay: Sleep,
42 }
43
44 pub(crate) struct EyeBallConfig {
45 // the timeout period for the entire connection establishment.
46 timeout: Option<Duration>,
47 // Delay to start other address family when the preferred address family is not complete
48 delay: Option<Duration>,
49 }
50
51 struct DomainAddrs {
52 addrs: Vec<SocketAddr>,
53 }
54
55 struct DomainAddrsIter<'a> {
56 iter: core::slice::Iter<'a, SocketAddr>,
57 }
58
59 impl DomainAddrs {
new(addrs: Vec<SocketAddr>) -> Self60 pub(crate) fn new(addrs: Vec<SocketAddr>) -> Self {
61 Self { addrs }
62 }
63
iter(&self) -> DomainAddrsIter64 pub(crate) fn iter(&self) -> DomainAddrsIter {
65 DomainAddrsIter {
66 iter: self.addrs.iter(),
67 }
68 }
69 }
70
71 impl<'a> Iterator for DomainAddrsIter<'a> {
72 type Item = &'a SocketAddr;
73
next(&mut self) -> Option<Self::Item>74 fn next(&mut self) -> Option<Self::Item> {
75 self.iter.next()
76 }
77 }
78
79 impl<'a> Deref for DomainAddrsIter<'a> {
80 type Target = core::slice::Iter<'a, SocketAddr>;
81
deref(&self) -> &Self::Target82 fn deref(&self) -> &Self::Target {
83 &self.iter
84 }
85 }
86
87 impl<'a> DerefMut for DomainAddrsIter<'a> {
deref_mut(&mut self) -> &mut Self::Target88 fn deref_mut(&mut self) -> &mut Self::Target {
89 &mut self.iter
90 }
91 }
92
93 impl DelayedAddrs {
new(addrs: RemoteAddrs, delay: Sleep) -> Self94 pub(crate) fn new(addrs: RemoteAddrs, delay: Sleep) -> Self {
95 DelayedAddrs { addrs, delay }
96 }
97 }
98
99 impl EyeBallConfig {
new(timeout: Option<Duration>, delay: Option<Duration>) -> Self100 pub(crate) fn new(timeout: Option<Duration>, delay: Option<Duration>) -> Self {
101 Self { timeout, delay }
102 }
103 }
104
105 impl RemoteAddrs {
new(addrs: Vec<SocketAddr>, timeout: Option<Duration>) -> Self106 fn new(addrs: Vec<SocketAddr>, timeout: Option<Duration>) -> Self {
107 Self {
108 addrs: DomainAddrs::new(addrs),
109 timeout,
110 }
111 }
112
connect(&mut self) -> Result<TcpStream, io::Error>113 async fn connect(&mut self) -> Result<TcpStream, io::Error> {
114 let mut unexpected = None;
115 for addr in self.addrs.iter() {
116 match connect(addr, self.timeout).await {
117 Ok(stream) => {
118 return Ok(stream);
119 }
120 Err(e) => {
121 unexpected = Some(e);
122 }
123 }
124 }
125 match unexpected {
126 None => Err(Error::new(ErrorKind::NotConnected, "Invalid domain")),
127 Some(e) => Err(e),
128 }
129 }
130 }
131
132 impl HappyEyeballs {
new(socket_addr: Vec<SocketAddr>, config: EyeBallConfig) -> Self133 pub(crate) fn new(socket_addr: Vec<SocketAddr>, config: EyeBallConfig) -> Self {
134 let socket_addr = ResolvedAddrs::new(socket_addr.into_iter());
135 // splits SocketAddrs into preferred and other family.
136 let (preferred, second) = socket_addr.split_preferred_addrs();
137 let preferred_size = preferred.len();
138 let second_size = second.len();
139 if second.is_empty() {
140 HappyEyeballs {
141 preferred_addr: RemoteAddrs::new(
142 preferred,
143 config
144 .timeout
145 .and_then(|time| time.checked_div(preferred_size as u32)),
146 ),
147 delay_addr: None,
148 }
149 } else {
150 let delay = if let Some(delay) = config.delay {
151 delay
152 } else {
153 Duration::from_millis(HAPPY_EYEBALLS_PREFERRED_TIMEOUT_MS)
154 };
155 HappyEyeballs {
156 preferred_addr: RemoteAddrs::new(
157 preferred,
158 config
159 .timeout
160 .and_then(|time| time.checked_div(preferred_size as u32)),
161 ),
162 // TODO Is it necessary to subtract the delay time
163 delay_addr: Some(DelayedAddrs::new(
164 RemoteAddrs::new(
165 second,
166 config
167 .timeout
168 .and_then(|time| time.checked_div(second_size as u32)),
169 ),
170 crate::runtime::sleep(delay),
171 )),
172 }
173 }
174 }
175
connect(mut self) -> io::Result<TcpStream>176 pub(crate) async fn connect(mut self) -> io::Result<TcpStream> {
177 match self.delay_addr {
178 None => self.preferred_addr.connect().await,
179 Some(mut second_addrs) => {
180 let preferred_fut = self.preferred_addr.connect();
181 let second_fut = second_addrs.addrs.connect();
182 let delay_fut = second_addrs.delay;
183
184 #[cfg(feature = "ylong_base")]
185 let (stream, stream_fut) = ylong_runtime::select! {
186 preferred = preferred_fut => {
187 (preferred, second_fut)
188 },
189 _ = delay_fut => {
190 let preferred_fut = self.preferred_addr.connect();
191 ylong_runtime::select! {
192 preferred = preferred_fut => {
193 let second_fut = second_addrs.addrs.connect();
194 (preferred, second_fut)
195 },
196 second = second_fut => {
197 let preferred_fut = self.preferred_addr.connect();
198 (second, preferred_fut)
199 },
200 }
201 },
202 };
203
204 #[cfg(feature = "tokio_base")]
205 let (stream, stream_fut) = tokio::select! {
206 preferred = preferred_fut => {
207 (preferred, second_fut)
208 },
209 _ = delay_fut => {
210 let preferred_fut = self.preferred_addr.connect();
211 tokio::select! {
212 preferred = preferred_fut => {
213 let second_fut = second_addrs.addrs.connect();
214 (preferred, second_fut)
215 },
216 second = second_fut => {
217 let preferred_fut = self.preferred_addr.connect();
218 (second, preferred_fut)
219 },
220 }
221 },
222 };
223
224 if stream.is_err() {
225 stream_fut.await
226 } else {
227 stream
228 }
229 }
230 }
231 }
232 }
233
connect( addr: &SocketAddr, timeout: Option<Duration>, ) -> impl Future<Output = io::Result<TcpStream>>234 fn connect(
235 addr: &SocketAddr,
236 timeout: Option<Duration>,
237 ) -> impl Future<Output = io::Result<TcpStream>> {
238 let stream_fut = TcpStream::connect(*addr);
239 async move {
240 match timeout {
241 None => stream_fut.await,
242 Some(duration) => match crate::runtime::timeout(duration, stream_fut).await {
243 Ok(Ok(result)) => Ok(result),
244 Ok(Err(e)) => Err(e),
245 Err(e) => Err(io::Error::new(ErrorKind::TimedOut, e)),
246 },
247 }
248 }
249 }
250