1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::mem::take; 15 use std::sync::{Arc, Mutex}; 16 use std::time::Instant; 17 18 #[cfg(feature = "http3")] 19 use ylong_http::request::uri::Authority; 20 #[cfg(any(feature = "http2", feature = "http3"))] 21 use ylong_http::request::uri::Scheme; 22 use ylong_http::request::uri::Uri; 23 24 #[cfg(feature = "http3")] 25 use crate::async_impl::quic::QuicConn; 26 use crate::async_impl::Connector; 27 #[cfg(feature = "http3")] 28 use crate::async_impl::Response; 29 use crate::error::HttpClientError; 30 use crate::runtime::{AsyncRead, AsyncWrite}; 31 #[cfg(feature = "http3")] 32 use crate::util::alt_svc::{AltService, AltServiceMap}; 33 #[cfg(feature = "http2")] 34 use crate::util::config::H2Config; 35 #[cfg(feature = "http3")] 36 use crate::util::config::H3Config; 37 use crate::util::config::{HttpConfig, HttpVersion}; 38 use crate::util::dispatcher::http1::{WrappedSemPermit, WrappedSemaphore}; 39 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher, TimeInfoConn}; 40 use crate::util::pool::{Pool, PoolKey}; 41 use crate::util::progress::SpeedConfig; 42 #[cfg(feature = "http3")] 43 use crate::util::request::RequestArc; 44 use crate::util::ConnInfo; 45 #[cfg(feature = "http2")] 46 use crate::ConnDetail; 47 use crate::TimeGroup; 48 49 pub(crate) struct ConnPool<C, S> { 50 pool: Pool<PoolKey, Conns<S>>, 51 #[cfg(feature = "http3")] 52 alt_svcs: AltServiceMap, 53 connector: Arc<C>, 54 config: HttpConfig, 55 } 56 57 impl<C: Connector> ConnPool<C, C::Stream> { new(config: HttpConfig, connector: C) -> Self58 pub(crate) fn new(config: HttpConfig, connector: C) -> Self { 59 Self { 60 pool: Pool::new(), 61 #[cfg(feature = "http3")] 62 alt_svcs: AltServiceMap::new(), 63 connector: Arc::new(connector), 64 config, 65 } 66 } 67 connect_to( &self, uri: &Uri, ) -> Result<TimeInfoConn<C::Stream>, HttpClientError>68 pub(crate) async fn connect_to( 69 &self, 70 uri: &Uri, 71 ) -> Result<TimeInfoConn<C::Stream>, HttpClientError> { 72 let key = PoolKey::new( 73 uri.scheme().unwrap().clone(), 74 uri.authority().unwrap().clone(), 75 ); 76 77 #[cfg(feature = "http3")] 78 let alt_svc = self.alt_svcs.get_alt_svcs(&key); 79 self.pool 80 .get( 81 key, 82 Conns::new, 83 self.config.http1_config.max_conn_num(), 84 self.config.speed_config, 85 ) 86 .conn( 87 self.config.clone(), 88 self.connector.clone(), 89 uri, 90 #[cfg(feature = "http3")] 91 alt_svc, 92 ) 93 .await 94 } 95 96 #[cfg(feature = "http3")] set_alt_svcs(&self, request: RequestArc, response: &Response)97 pub(crate) fn set_alt_svcs(&self, request: RequestArc, response: &Response) { 98 self.alt_svcs.set_alt_svcs(request, response); 99 } 100 } 101 102 pub(crate) enum H1ConnOption<T> { 103 Some(T), 104 None(WrappedSemPermit), 105 } 106 107 pub(crate) struct Conns<S> { 108 speed_config: SpeedConfig, 109 usable: WrappedSemaphore, 110 list: Arc<Mutex<Vec<ConnDispatcher<S>>>>, 111 #[cfg(feature = "http2")] 112 h2_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>, 113 #[cfg(feature = "http3")] 114 h3_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>, 115 } 116 117 impl<S> Conns<S> { new(max_conn_num: usize, speed_config: SpeedConfig) -> Self118 fn new(max_conn_num: usize, speed_config: SpeedConfig) -> Self { 119 Self { 120 speed_config, 121 usable: WrappedSemaphore::new(max_conn_num), 122 123 list: Arc::new(Mutex::new(Vec::new())), 124 125 #[cfg(feature = "http2")] 126 h2_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))), 127 128 #[cfg(feature = "http3")] 129 h3_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))), 130 } 131 } 132 133 // fn get_alt_svcs 134 } 135 136 impl<S> Clone for Conns<S> { clone(&self) -> Self137 fn clone(&self) -> Self { 138 Self { 139 speed_config: self.speed_config, 140 usable: self.usable.clone(), 141 list: self.list.clone(), 142 143 #[cfg(feature = "http2")] 144 h2_conn: self.h2_conn.clone(), 145 146 #[cfg(feature = "http3")] 147 h3_conn: self.h3_conn.clone(), 148 } 149 } 150 } 151 152 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Send + Sync + 'static> Conns<S> { conn<C>( &mut self, config: HttpConfig, connector: Arc<C>, url: &Uri, #[cfg(feature = "http3")] alt_svc: Option<Vec<AltService>>, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,153 async fn conn<C>( 154 &mut self, 155 config: HttpConfig, 156 connector: Arc<C>, 157 url: &Uri, 158 #[cfg(feature = "http3")] alt_svc: Option<Vec<AltService>>, 159 ) -> Result<TimeInfoConn<S>, HttpClientError> 160 where 161 C: Connector<Stream = S>, 162 { 163 let conn_start = Instant::now(); 164 let mut conn = match config.version { 165 #[cfg(feature = "http3")] 166 HttpVersion::Http3 => self.conn_h3(connector, url, config.http3_config).await, 167 #[cfg(feature = "http2")] 168 HttpVersion::Http2 => self.conn_h2(connector, url, config.http2_config).await, 169 #[cfg(feature = "http1_1")] 170 HttpVersion::Http1 => self.conn_h1(connector, url).await, 171 #[cfg(all(feature = "http1_1", not(feature = "http2")))] 172 HttpVersion::Negotiate => self.conn_h1(connector, url).await, 173 #[cfg(all(feature = "http1_1", feature = "http2"))] 174 HttpVersion::Negotiate => { 175 #[cfg(feature = "http3")] 176 if let Some(mut conn) = self 177 .conn_alt_svc(&connector, url, alt_svc, config.http3_config) 178 .await 179 { 180 conn.time_group_mut().set_connect_start(conn_start); 181 conn.time_group_mut().set_connect_end(Instant::now()); 182 return Ok(conn); 183 } 184 self.conn_negotiate(connector, url, config.http2_config) 185 .await 186 } 187 }?; 188 conn.time_group_mut().set_connect_start(conn_start); 189 conn.time_group_mut().set_connect_end(Instant::now()); 190 Ok(conn) 191 } 192 conn_h1<C>( &self, connector: Arc<C>, url: &Uri, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,193 async fn conn_h1<C>( 194 &self, 195 connector: Arc<C>, 196 url: &Uri, 197 ) -> Result<TimeInfoConn<S>, HttpClientError> 198 where 199 C: Connector<Stream = S>, 200 { 201 let semaphore = self.usable.acquire().await; 202 match self.exist_h1_conn(semaphore) { 203 H1ConnOption::Some(conn) => Ok(TimeInfoConn::new(conn, TimeGroup::default())), 204 H1ConnOption::None(permit) => { 205 let stream = connector.connect(url, HttpVersion::Http1).await?; 206 let time_group = take(stream.conn_data().time_group_mut()); 207 208 let dispatcher = ConnDispatcher::http1(stream); 209 let conn = self.dispatch_h1_conn(dispatcher, permit); 210 Ok(TimeInfoConn::new(conn, time_group)) 211 } 212 } 213 } 214 215 #[cfg(feature = "http2")] conn_h2<C>( &self, connector: Arc<C>, url: &Uri, config: H2Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,216 async fn conn_h2<C>( 217 &self, 218 connector: Arc<C>, 219 url: &Uri, 220 config: H2Config, 221 ) -> Result<TimeInfoConn<S>, HttpClientError> 222 where 223 C: Connector<Stream = S>, 224 { 225 // The lock `h2_occupation` is used to prevent multiple coroutines from sending 226 // Requests at the same time under concurrent conditions, 227 // resulting in the creation of multiple tcp connections 228 let mut lock = self.h2_conn.lock().await; 229 230 if let Some(conn) = self.exist_h2_conn(&mut lock) { 231 return Ok(TimeInfoConn::new(conn, TimeGroup::default())); 232 } 233 let stream = connector.connect(url, HttpVersion::Http2).await?; 234 let mut data = stream.conn_data(); 235 let tls = if let Some(scheme) = url.scheme() { 236 *scheme == Scheme::HTTPS 237 } else { 238 false 239 }; 240 match data.negotiate().alpn() { 241 None if tls => return err_from_msg!(Connect, "The peer does not support http/2."), 242 Some(protocol) if protocol != b"h2" => { 243 return err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.") 244 } 245 _ => {} 246 } 247 let time_group = take(data.time_group_mut()); 248 let conn = self.dispatch_h2_conn(data.detail(), config, stream, &mut lock); 249 Ok(TimeInfoConn::new(conn, time_group)) 250 } 251 252 #[cfg(feature = "http3")] conn_h3<C>( &self, connector: Arc<C>, url: &Uri, config: H3Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,253 async fn conn_h3<C>( 254 &self, 255 connector: Arc<C>, 256 url: &Uri, 257 config: H3Config, 258 ) -> Result<TimeInfoConn<S>, HttpClientError> 259 where 260 C: Connector<Stream = S>, 261 { 262 let mut lock = self.h3_conn.lock().await; 263 264 if let Some(conn) = self.exist_h3_conn(&mut lock) { 265 return Ok(TimeInfoConn::new(conn, TimeGroup::default())); 266 } 267 let mut stream = connector.connect(url, HttpVersion::Http3).await?; 268 269 let quic_conn = stream.quic_conn().ok_or(HttpClientError::from_str( 270 crate::ErrorKind::Connect, 271 "QUIC connect failed", 272 ))?; 273 274 let mut data = stream.conn_data(); 275 let time_group = take(data.time_group_mut()); 276 Ok(TimeInfoConn::new( 277 self.dispatch_h3_conn(data.detail(), config, stream, quic_conn, &mut lock), 278 time_group, 279 )) 280 } 281 282 #[cfg(all(feature = "http2", feature = "http1_1"))] conn_negotiate<C>( &self, connector: Arc<C>, url: &Uri, h2_config: H2Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,283 async fn conn_negotiate<C>( 284 &self, 285 connector: Arc<C>, 286 url: &Uri, 287 h2_config: H2Config, 288 ) -> Result<TimeInfoConn<S>, HttpClientError> 289 where 290 C: Connector<Stream = S>, 291 { 292 match *url.scheme().unwrap() { 293 Scheme::HTTPS => { 294 let mut lock = self.h2_conn.lock().await; 295 if let Some(conn) = self.exist_h2_conn(&mut lock) { 296 return Ok(TimeInfoConn::new(conn, TimeGroup::default())); 297 } 298 let permit = self.usable.acquire().await; 299 let permit = match self.exist_h1_conn(permit) { 300 H1ConnOption::Some(conn) => { 301 return Ok(TimeInfoConn::new(conn, TimeGroup::default())); 302 } 303 H1ConnOption::None(permit) => permit, 304 }; 305 let stream = connector.connect(url, HttpVersion::Negotiate).await?; 306 let mut data = stream.conn_data(); 307 let time_group = take(data.time_group_mut()); 308 309 let protocol = data.negotiate().alpn().unwrap_or(b"http/1.1"); 310 if protocol == b"http/1.1" { 311 let dispatcher = ConnDispatcher::http1(stream); 312 Ok(TimeInfoConn::new( 313 self.dispatch_h1_conn(dispatcher, permit), 314 time_group, 315 )) 316 } else if protocol == b"h2" { 317 std::mem::drop(permit); 318 let conn = self.dispatch_h2_conn(data.detail(), h2_config, stream, &mut lock); 319 Ok(TimeInfoConn::new(conn, time_group)) 320 } else { 321 std::mem::drop(permit); 322 err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.") 323 } 324 } 325 Scheme::HTTP => self.conn_h1(connector, url).await, 326 } 327 } 328 329 #[cfg(feature = "http3")] conn_alt_svc<C>( &self, connector: &Arc<C>, url: &Uri, alt_svcs: Option<Vec<AltService>>, h3_config: H3Config, ) -> Option<TimeInfoConn<S>> where C: Connector<Stream = S>,330 async fn conn_alt_svc<C>( 331 &self, 332 connector: &Arc<C>, 333 url: &Uri, 334 alt_svcs: Option<Vec<AltService>>, 335 h3_config: H3Config, 336 ) -> Option<TimeInfoConn<S>> 337 where 338 C: Connector<Stream = S>, 339 { 340 let mut lock = self.h3_conn.lock().await; 341 if let Some(conn) = self.exist_h3_conn(&mut lock) { 342 return Some(TimeInfoConn::new(conn, TimeGroup::default())); 343 } 344 if let Some(alt_svcs) = alt_svcs { 345 for alt_svc in alt_svcs { 346 // only support h3 alt_svc now 347 if alt_svc.http_version != HttpVersion::Http3 { 348 continue; 349 } 350 let scheme = Scheme::HTTPS; 351 let host = match alt_svc.host { 352 Some(ref host) => host.clone(), 353 None => url.host().cloned().unwrap(), 354 }; 355 let port = alt_svc.port.clone(); 356 let authority = 357 Authority::from_bytes((host.to_string() + ":" + port.as_str()).as_bytes()) 358 .ok()?; 359 let path = url.path().cloned(); 360 let query = url.query().cloned(); 361 let alt_url = Uri::from_raw_parts(Some(scheme), Some(authority), path, query); 362 let mut stream = connector.connect(&alt_url, HttpVersion::Http3).await.ok()?; 363 let quic_conn = stream.quic_conn().unwrap(); 364 let mut data = stream.conn_data(); 365 let time_group = take(data.time_group_mut()); 366 return Some(TimeInfoConn::new( 367 self.dispatch_h3_conn( 368 data.detail(), 369 h3_config.clone(), 370 stream, 371 quic_conn, 372 &mut lock, 373 ), 374 time_group, 375 )); 376 } 377 } 378 None 379 } 380 dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>, permit: WrappedSemPermit) -> Conn<S>381 fn dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>, permit: WrappedSemPermit) -> Conn<S> { 382 // We must be able to get the `Conn` here. 383 let mut conn = dispatcher.dispatch().unwrap(); 384 let mut list = self.list.lock().unwrap(); 385 list.push(dispatcher); 386 #[cfg(any(feature = "http2", feature = "http3"))] 387 if let Conn::Http1(ref mut h1) = conn { 388 h1.speed_controller.set_speed_limit(self.speed_config); 389 h1.occupy_sem(permit) 390 } 391 #[cfg(all(not(feature = "http2"), not(feature = "http3")))] 392 { 393 let Conn::Http1(ref mut h1) = conn; 394 h1.speed_controller.set_speed_limit(self.speed_config); 395 h1.occupy_sem(permit) 396 } 397 conn 398 } 399 400 #[cfg(feature = "http2")] dispatch_h2_conn( &self, detail: ConnDetail, config: H2Config, stream: S, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Conn<S>401 fn dispatch_h2_conn( 402 &self, 403 detail: ConnDetail, 404 config: H2Config, 405 stream: S, 406 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 407 ) -> Conn<S> { 408 let dispatcher = ConnDispatcher::http2(detail, config, stream); 409 let mut conn = dispatcher.dispatch().unwrap(); 410 lock.push(dispatcher); 411 if let Conn::Http2(ref mut h2) = conn { 412 h2.speed_controller.set_speed_limit(self.speed_config); 413 } 414 conn 415 } 416 417 #[cfg(feature = "http3")] dispatch_h3_conn( &self, detail: ConnDetail, config: H3Config, stream: S, quic_connection: QuicConn, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Conn<S>418 fn dispatch_h3_conn( 419 &self, 420 detail: ConnDetail, 421 config: H3Config, 422 stream: S, 423 quic_connection: QuicConn, 424 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 425 ) -> Conn<S> { 426 let dispatcher = ConnDispatcher::http3(detail, config, stream, quic_connection); 427 let mut conn = dispatcher.dispatch().unwrap(); 428 lock.push(dispatcher); 429 if let Conn::Http3(ref mut h3) = conn { 430 h3.speed_controller.set_speed_limit(self.speed_config); 431 } 432 conn 433 } 434 exist_h1_conn(&self, permit: WrappedSemPermit) -> H1ConnOption<Conn<S>>435 fn exist_h1_conn(&self, permit: WrappedSemPermit) -> H1ConnOption<Conn<S>> { 436 let mut list = self.list.lock().unwrap(); 437 let mut conn = None; 438 let curr = take(&mut *list); 439 // TODO Distinguish between http2 connections and http1 connections. 440 for dispatcher in curr.into_iter() { 441 // Discard invalid dispatchers. 442 if dispatcher.is_shutdown() { 443 continue; 444 } 445 if conn.is_none() { 446 conn = dispatcher.dispatch(); 447 } 448 list.push(dispatcher); 449 } 450 match conn { 451 Some(Conn::Http1(mut h1)) => { 452 h1.occupy_sem(permit); 453 h1.speed_controller.set_speed_limit(self.speed_config); 454 H1ConnOption::Some(Conn::Http1(h1)) 455 } 456 _ => H1ConnOption::None(permit), 457 } 458 } 459 460 #[cfg(feature = "http2")] exist_h2_conn( &self, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Option<Conn<S>>461 fn exist_h2_conn( 462 &self, 463 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 464 ) -> Option<Conn<S>> { 465 if let Some(dispatcher) = lock.pop() { 466 if dispatcher.is_shutdown() { 467 return None; 468 } 469 if !dispatcher.is_goaway() { 470 if let Some(Conn::Http2(mut h2)) = dispatcher.dispatch() { 471 lock.push(dispatcher); 472 h2.speed_controller.set_speed_limit(self.speed_config); 473 return Some(Conn::Http2(h2)); 474 } 475 } 476 lock.push(dispatcher); 477 } 478 None 479 } 480 481 #[cfg(feature = "http3")] exist_h3_conn( &self, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Option<Conn<S>>482 fn exist_h3_conn( 483 &self, 484 lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, 485 ) -> Option<Conn<S>> { 486 if let Some(dispatcher) = lock.pop() { 487 if dispatcher.is_shutdown() { 488 return None; 489 } 490 if !dispatcher.is_goaway() { 491 if let Some(Conn::Http3(mut h3)) = dispatcher.dispatch() { 492 lock.push(dispatcher); 493 h3.speed_controller.set_speed_limit(self.speed_config); 494 return Some(Conn::Http3(h3)); 495 } 496 } 497 // Not all requests have been processed yet 498 lock.push(dispatcher); 499 } 500 None 501 } 502 } 503