1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::mem::take; 15 use std::sync::{Arc, Mutex}; 16 use std::time::Instant; 17 18 #[cfg(feature = "http3")] 19 use ylong_http::request::uri::Authority; 20 #[cfg(any(feature = "http2", feature = "http3"))] 21 use ylong_http::request::uri::Scheme; 22 use ylong_http::request::uri::Uri; 23 24 #[cfg(feature = "http3")] 25 use crate::async_impl::quic::QuicConn; 26 use crate::async_impl::Connector; 27 #[cfg(feature = "http3")] 28 use crate::async_impl::Response; 29 use crate::error::HttpClientError; 30 use crate::runtime::{AsyncRead, AsyncWrite}; 31 #[cfg(feature = "http3")] 32 use crate::util::alt_svc::{AltService, AltServiceMap}; 33 #[cfg(feature = "http2")] 34 use crate::util::config::H2Config; 35 #[cfg(feature = "http3")] 36 use crate::util::config::H3Config; 37 use crate::util::config::{HttpConfig, HttpVersion}; 38 use crate::util::dispatcher::http1::{WrappedSemPermit, WrappedSemaphore}; 39 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher, TimeInfoConn}; 40 use crate::util::pool::{Pool, PoolKey}; 41 #[cfg(feature = "http3")] 42 use crate::util::request::RequestArc; 43 use crate::util::ConnInfo; 44 #[cfg(feature = "http2")] 45 use crate::ConnDetail; 46 use crate::TimeGroup; 47 48 pub(crate) struct ConnPool<C, S> { 49 pool: Pool<PoolKey, Conns<S>>, 50 #[cfg(feature = "http3")] 51 alt_svcs: AltServiceMap, 52 connector: Arc<C>, 53 config: HttpConfig, 54 } 55 56 impl<C: Connector> ConnPool<C, C::Stream> { new(config: HttpConfig, connector: C) -> Self57 pub(crate) fn new(config: HttpConfig, connector: C) -> Self { 58 Self { 59 pool: Pool::new(), 60 #[cfg(feature = "http3")] 61 alt_svcs: AltServiceMap::new(), 62 connector: Arc::new(connector), 63 config, 64 } 65 } 66 connect_to( &self, uri: &Uri, ) -> Result<TimeInfoConn<C::Stream>, HttpClientError>67 pub(crate) async fn connect_to( 68 &self, 69 uri: &Uri, 70 ) -> Result<TimeInfoConn<C::Stream>, HttpClientError> { 71 let key = PoolKey::new( 72 uri.scheme().unwrap().clone(), 73 uri.authority().unwrap().clone(), 74 ); 75 76 #[cfg(feature = "http3")] 77 let alt_svc = self.alt_svcs.get_alt_svcs(&key); 78 self.pool 79 .get(key, Conns::new, self.config.http1_config.max_conn_num()) 80 .conn( 81 self.config.clone(), 82 self.connector.clone(), 83 uri, 84 #[cfg(feature = "http3")] 85 alt_svc, 86 ) 87 .await 88 } 89 90 #[cfg(feature = "http3")] set_alt_svcs(&self, request: RequestArc, response: &Response)91 pub(crate) fn set_alt_svcs(&self, request: RequestArc, response: &Response) { 92 self.alt_svcs.set_alt_svcs(request, response); 93 } 94 } 95 96 pub(crate) enum H1ConnOption<T> { 97 Some(T), 98 None(WrappedSemPermit), 99 } 100 101 pub(crate) struct Conns<S> { 102 usable: WrappedSemaphore, 103 list: Arc<Mutex<Vec<ConnDispatcher<S>>>>, 104 #[cfg(feature = "http2")] 105 h2_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>, 106 #[cfg(feature = "http3")] 107 h3_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>, 108 } 109 110 impl<S> Conns<S> { new(max_conn_num: usize) -> Self111 fn new(max_conn_num: usize) -> Self { 112 Self { 113 usable: WrappedSemaphore::new(max_conn_num), 114 115 list: Arc::new(Mutex::new(Vec::new())), 116 117 #[cfg(feature = "http2")] 118 h2_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))), 119 120 #[cfg(feature = "http3")] 121 h3_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))), 122 } 123 } 124 125 // fn get_alt_svcs 126 } 127 128 impl<S> Clone for Conns<S> { clone(&self) -> Self129 fn clone(&self) -> Self { 130 Self { 131 usable: self.usable.clone(), 132 list: self.list.clone(), 133 134 #[cfg(feature = "http2")] 135 h2_conn: self.h2_conn.clone(), 136 137 #[cfg(feature = "http3")] 138 h3_conn: self.h3_conn.clone(), 139 } 140 } 141 } 142 143 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Send + Sync + 'static> Conns<S> { conn<C>( &mut self, config: HttpConfig, connector: Arc<C>, url: &Uri, #[cfg(feature = "http3")] alt_svc: Option<Vec<AltService>>, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,144 async fn conn<C>( 145 &mut self, 146 config: HttpConfig, 147 connector: Arc<C>, 148 url: &Uri, 149 #[cfg(feature = "http3")] alt_svc: Option<Vec<AltService>>, 150 ) -> Result<TimeInfoConn<S>, HttpClientError> 151 where 152 C: Connector<Stream = S>, 153 { 154 let conn_start = Instant::now(); 155 let mut conn = match config.version { 156 #[cfg(feature = "http3")] 157 HttpVersion::Http3 => self.conn_h3(connector, url, config.http3_config).await, 158 #[cfg(feature = "http2")] 159 HttpVersion::Http2 => self.conn_h2(connector, url, config.http2_config).await, 160 #[cfg(feature = "http1_1")] 161 HttpVersion::Http1 => self.conn_h1(connector, url).await, 162 #[cfg(all(feature = "http1_1", not(feature = "http2")))] 163 HttpVersion::Negotiate => self.conn_h1(connector, url).await, 164 #[cfg(all(feature = "http1_1", feature = "http2"))] 165 HttpVersion::Negotiate => { 166 #[cfg(feature = "http3")] 167 if let Some(mut conn) = self 168 .conn_alt_svc(&connector, url, alt_svc, config.http3_config) 169 .await 170 { 171 conn.time_group_mut().set_connect_start(conn_start); 172 conn.time_group_mut().set_connect_end(Instant::now()); 173 return Ok(conn); 174 } 175 self.conn_negotiate(connector, url, config.http2_config) 176 .await 177 } 178 }?; 179 conn.time_group_mut().set_connect_start(conn_start); 180 conn.time_group_mut().set_connect_end(Instant::now()); 181 Ok(conn) 182 } 183 conn_h1<C>( &self, connector: Arc<C>, url: &Uri, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,184 async fn conn_h1<C>( 185 &self, 186 connector: Arc<C>, 187 url: &Uri, 188 ) -> Result<TimeInfoConn<S>, HttpClientError> 189 where 190 C: Connector<Stream = S>, 191 { 192 let semaphore = self.usable.acquire().await; 193 match self.exist_h1_conn(semaphore) { 194 H1ConnOption::Some(conn) => Ok(TimeInfoConn::new(conn, TimeGroup::default())), 195 H1ConnOption::None(permit) => { 196 let stream = connector.connect(url, HttpVersion::Http1).await?; 197 let time_group = take(stream.conn_data().time_group_mut()); 198 199 let dispatcher = ConnDispatcher::http1(stream); 200 let conn = self.dispatch_h1_conn(dispatcher, permit); 201 Ok(TimeInfoConn::new(conn, time_group)) 202 } 203 } 204 } 205 206 #[cfg(feature = "http2")] conn_h2<C>( &self, connector: Arc<C>, url: &Uri, config: H2Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,207 async fn conn_h2<C>( 208 &self, 209 connector: Arc<C>, 210 url: &Uri, 211 config: H2Config, 212 ) -> Result<TimeInfoConn<S>, HttpClientError> 213 where 214 C: Connector<Stream = S>, 215 { 216 // The lock `h2_occupation` is used to prevent multiple coroutines from sending 217 // Requests at the same time under concurrent conditions, 218 // resulting in the creation of multiple tcp connections 219 let mut lock = self.h2_conn.lock().await; 220 221 if let Some(conn) = Self::exist_h2_conn(&mut lock) { 222 return Ok(TimeInfoConn::new(conn, TimeGroup::default())); 223 } 224 let stream = connector.connect(url, HttpVersion::Http2).await?; 225 let mut data = stream.conn_data(); 226 let tls = if let Some(scheme) = url.scheme() { 227 *scheme == Scheme::HTTPS 228 } else { 229 false 230 }; 231 match data.negotiate().alpn() { 232 None if tls => return err_from_msg!(Connect, "The peer does not support http/2."), 233 Some(protocol) if protocol != b"h2" => { 234 return err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.") 235 } 236 _ => {} 237 } 238 let time_group = take(data.time_group_mut()); 239 let conn = Self::dispatch_h2_conn(data.detail(), config, stream, &mut lock); 240 Ok(TimeInfoConn::new(conn, time_group)) 241 } 242 243 #[cfg(feature = "http3")] conn_h3<C>( &self, connector: Arc<C>, url: &Uri, config: H3Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,244 async fn conn_h3<C>( 245 &self, 246 connector: Arc<C>, 247 url: &Uri, 248 config: H3Config, 249 ) -> Result<TimeInfoConn<S>, HttpClientError> 250 where 251 C: Connector<Stream = S>, 252 { 253 let mut lock = self.h3_conn.lock().await; 254 255 if let Some(conn) = Self::exist_h3_conn(&mut lock) { 256 return Ok(TimeInfoConn::new(conn, TimeGroup::default())); 257 } 258 let mut stream = connector.connect(url, HttpVersion::Http3).await?; 259 260 let quic_conn = stream.quic_conn().ok_or(HttpClientError::from_str( 261 crate::ErrorKind::Connect, 262 "QUIC connect failed", 263 ))?; 264 265 let mut data = stream.conn_data(); 266 let time_group = take(data.time_group_mut()); 267 Ok(TimeInfoConn::new( 268 Self::dispatch_h3_conn(data.detail(), config, stream, quic_conn, &mut lock), 269 time_group, 270 )) 271 } 272 273 #[cfg(all(feature = "http2", feature = "http1_1"))] conn_negotiate<C>( &self, connector: Arc<C>, url: &Uri, h2_config: H2Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,274 async fn conn_negotiate<C>( 275 &self, 276 connector: Arc<C>, 277 url: &Uri, 278 h2_config: H2Config, 279 ) -> Result<TimeInfoConn<S>, HttpClientError> 280 where 281 C: Connector<Stream = S>, 282 { 283 match *url.scheme().unwrap() { 284 Scheme::HTTPS => { 285 let mut lock = self.h2_conn.lock().await; 286 if let Some(conn) = Self::exist_h2_conn(&mut lock) { 287 return Ok(TimeInfoConn::new(conn, TimeGroup::default())); 288 } 289 let permit = self.usable.acquire().await; 290 let permit = match self.exist_h1_conn(permit) { 291 H1ConnOption::Some(conn) => { 292 return Ok(TimeInfoConn::new(conn, TimeGroup::default())); 293 } 294 H1ConnOption::None(permit) => permit, 295 }; 296 let stream = connector.connect(url, HttpVersion::Negotiate).await?; 297 let mut data = stream.conn_data(); 298 let time_group = take(data.time_group_mut()); 299 300 let protocol = if let Some(bytes) = data.negotiate().alpn() { 301 bytes 302 } else { 303 let dispatcher = ConnDispatcher::http1(stream); 304 return Ok(TimeInfoConn::new( 305 self.dispatch_h1_conn(dispatcher, permit), 306 time_group, 307 )); 308 }; 309 310 if protocol == b"http/1.1" { 311 let dispatcher = ConnDispatcher::http1(stream); 312 Ok(TimeInfoConn::new( 313 self.dispatch_h1_conn(dispatcher, permit), 314 time_group, 315 )) 316 } else if protocol == b"h2" { 317 std::mem::drop(permit); 318 let conn = Self::dispatch_h2_conn(data.detail(), h2_config, stream, &mut lock); 319 Ok(TimeInfoConn::new(conn, time_group)) 320 } else { 321 std::mem::drop(permit); 322 err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.") 323 } 324 } 325 Scheme::HTTP => self.conn_h1(connector, url).await, 326 } 327 } 328 329 #[cfg(feature = "http3")] conn_alt_svc<C>( &self, connector: &Arc<C>, url: &Uri, alt_svcs: Option<Vec<AltService>>, h3_config: H3Config, ) -> Option<TimeInfoConn<S>> where C: Connector<Stream = S>,330 async fn conn_alt_svc<C>( 331 &self, 332 connector: &Arc<C>, 333 url: &Uri, 334 alt_svcs: Option<Vec<AltService>>, 335 h3_config: H3Config, 336 ) -> Option<TimeInfoConn<S>> 337 where 338 C: Connector<Stream = S>, 339 { 340 let mut lock = self.h3_conn.lock().await; 341 if let Some(conn) = Self::exist_h3_conn(&mut lock) { 342 return Some(TimeInfoConn::new(conn, TimeGroup::default())); 343 } 344 if let Some(alt_svcs) = alt_svcs { 345 for alt_svc in alt_svcs { 346 // only support h3 alt_svc now 347 if alt_svc.http_version != HttpVersion::Http3 { 348 continue; 349 } 350 let scheme = Scheme::HTTPS; 351 let host = match alt_svc.host { 352 Some(ref host) => host.clone(), 353 None => url.host().cloned().unwrap(), 354 }; 355 let port = alt_svc.port.clone(); 356 let authority = 357 Authority::from_bytes((host.to_string() + ":" + port.as_str()).as_bytes()) 358 .ok()?; 359 let path = url.path().cloned(); 360 let query = url.query().cloned(); 361 let alt_url = Uri::from_raw_parts(Some(scheme), Some(authority), path, query); 362 let mut stream = connector.connect(&alt_url, HttpVersion::Http3).await.ok()?; 363 let quic_conn = stream.quic_conn().unwrap(); 364 let mut data = stream.conn_data(); 365 let time_group = take(data.time_group_mut()); 366 return Some(TimeInfoConn::new( 367 Self::dispatch_h3_conn( 368 data.detail(), 369 h3_config.clone(), 370 stream, 371 quic_conn, 372 &mut lock, 373 ), 374 time_group, 375 )); 376 } 377 } 378 None 379 } 380 dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>, permit: WrappedSemPermit) -> Conn<S>381 fn dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>, permit: WrappedSemPermit) -> Conn<S> { 382 // We must be able to get the `Conn` here. 383 let mut conn = dispatcher.dispatch().unwrap(); 384 let mut list = self.list.lock().unwrap(); 385 list.push(dispatcher); 386 #[cfg(any(feature = "http2", feature = "http3"))] 387 if let Conn::Http1(ref mut h1) = conn { 388 h1.occupy_sem(permit) 389 } 390 #[cfg(all(not(feature = "http2"), not(feature = "http3")))] 391 { 392 let Conn::Http1(ref mut h1) = conn; 393 h1.occupy_sem(permit) 394 } 395 conn 396 } 397 398 #[cfg(feature = "http2")] dispatch_h2_conn( detail: ConnDetail, config: H2Config, stream: S, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Conn<S>399 fn dispatch_h2_conn( 400 detail: ConnDetail, 401 config: H2Config, 402 stream: S, 403 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 404 ) -> Conn<S> { 405 let dispatcher = ConnDispatcher::http2(detail, config, stream); 406 let conn = dispatcher.dispatch().unwrap(); 407 lock.push(dispatcher); 408 conn 409 } 410 411 #[cfg(feature = "http3")] dispatch_h3_conn( detail: ConnDetail, config: H3Config, stream: S, quic_connection: QuicConn, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Conn<S>412 fn dispatch_h3_conn( 413 detail: ConnDetail, 414 config: H3Config, 415 stream: S, 416 quic_connection: QuicConn, 417 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 418 ) -> Conn<S> { 419 let dispatcher = ConnDispatcher::http3(detail, config, stream, quic_connection); 420 let conn = dispatcher.dispatch().unwrap(); 421 lock.push(dispatcher); 422 conn 423 } 424 exist_h1_conn(&self, permit: WrappedSemPermit) -> H1ConnOption<Conn<S>>425 fn exist_h1_conn(&self, permit: WrappedSemPermit) -> H1ConnOption<Conn<S>> { 426 let mut list = self.list.lock().unwrap(); 427 let mut conn = None; 428 let curr = take(&mut *list); 429 // TODO Distinguish between http2 connections and http1 connections. 430 for dispatcher in curr.into_iter() { 431 // Discard invalid dispatchers. 432 if dispatcher.is_shutdown() { 433 continue; 434 } 435 if conn.is_none() { 436 conn = dispatcher.dispatch(); 437 } 438 list.push(dispatcher); 439 } 440 match conn { 441 Some(Conn::Http1(mut h1)) => { 442 h1.occupy_sem(permit); 443 H1ConnOption::Some(Conn::Http1(h1)) 444 } 445 _ => H1ConnOption::None(permit), 446 } 447 } 448 449 #[cfg(feature = "http2")] exist_h2_conn( lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Option<Conn<S>>450 fn exist_h2_conn( 451 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 452 ) -> Option<Conn<S>> { 453 if let Some(dispatcher) = lock.pop() { 454 if dispatcher.is_shutdown() { 455 return None; 456 } 457 if !dispatcher.is_goaway() { 458 if let Some(conn) = dispatcher.dispatch() { 459 lock.push(dispatcher); 460 return Some(conn); 461 } 462 } 463 lock.push(dispatcher); 464 } 465 None 466 } 467 468 #[cfg(feature = "http3")] exist_h3_conn( lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Option<Conn<S>>469 fn exist_h3_conn( 470 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 471 ) -> Option<Conn<S>> { 472 if let Some(dispatcher) = lock.pop() { 473 if dispatcher.is_shutdown() { 474 return None; 475 } 476 if !dispatcher.is_goaway() { 477 if let Some(conn) = dispatcher.dispatch() { 478 lock.push(dispatcher); 479 return Some(conn); 480 } 481 } 482 // Not all requests have been processed yet 483 lock.push(dispatcher); 484 } 485 None 486 } 487 } 488