• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::mem::take;
15 use std::sync::{Arc, Mutex};
16 use std::time::Instant;
17 
18 #[cfg(feature = "http3")]
19 use ylong_http::request::uri::Authority;
20 #[cfg(any(feature = "http2", feature = "http3"))]
21 use ylong_http::request::uri::Scheme;
22 use ylong_http::request::uri::Uri;
23 
24 #[cfg(feature = "http3")]
25 use crate::async_impl::quic::QuicConn;
26 use crate::async_impl::Connector;
27 #[cfg(feature = "http3")]
28 use crate::async_impl::Response;
29 use crate::error::HttpClientError;
30 use crate::runtime::{AsyncRead, AsyncWrite};
31 #[cfg(feature = "http3")]
32 use crate::util::alt_svc::{AltService, AltServiceMap};
33 #[cfg(feature = "http2")]
34 use crate::util::config::H2Config;
35 #[cfg(feature = "http3")]
36 use crate::util::config::H3Config;
37 use crate::util::config::{HttpConfig, HttpVersion};
38 use crate::util::dispatcher::http1::{WrappedSemPermit, WrappedSemaphore};
39 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher, TimeInfoConn};
40 use crate::util::pool::{Pool, PoolKey};
41 #[cfg(feature = "http3")]
42 use crate::util::request::RequestArc;
43 use crate::util::ConnInfo;
44 #[cfg(feature = "http2")]
45 use crate::ConnDetail;
46 use crate::TimeGroup;
47 
48 pub(crate) struct ConnPool<C, S> {
49     pool: Pool<PoolKey, Conns<S>>,
50     #[cfg(feature = "http3")]
51     alt_svcs: AltServiceMap,
52     connector: Arc<C>,
53     config: HttpConfig,
54 }
55 
56 impl<C: Connector> ConnPool<C, C::Stream> {
new(config: HttpConfig, connector: C) -> Self57     pub(crate) fn new(config: HttpConfig, connector: C) -> Self {
58         Self {
59             pool: Pool::new(),
60             #[cfg(feature = "http3")]
61             alt_svcs: AltServiceMap::new(),
62             connector: Arc::new(connector),
63             config,
64         }
65     }
66 
connect_to( &self, uri: &Uri, ) -> Result<TimeInfoConn<C::Stream>, HttpClientError>67     pub(crate) async fn connect_to(
68         &self,
69         uri: &Uri,
70     ) -> Result<TimeInfoConn<C::Stream>, HttpClientError> {
71         let key = PoolKey::new(
72             uri.scheme().unwrap().clone(),
73             uri.authority().unwrap().clone(),
74         );
75 
76         #[cfg(feature = "http3")]
77         let alt_svc = self.alt_svcs.get_alt_svcs(&key);
78         self.pool
79             .get(key, Conns::new, self.config.http1_config.max_conn_num())
80             .conn(
81                 self.config.clone(),
82                 self.connector.clone(),
83                 uri,
84                 #[cfg(feature = "http3")]
85                 alt_svc,
86             )
87             .await
88     }
89 
90     #[cfg(feature = "http3")]
set_alt_svcs(&self, request: RequestArc, response: &Response)91     pub(crate) fn set_alt_svcs(&self, request: RequestArc, response: &Response) {
92         self.alt_svcs.set_alt_svcs(request, response);
93     }
94 }
95 
96 pub(crate) enum H1ConnOption<T> {
97     Some(T),
98     None(WrappedSemPermit),
99 }
100 
101 pub(crate) struct Conns<S> {
102     usable: WrappedSemaphore,
103     list: Arc<Mutex<Vec<ConnDispatcher<S>>>>,
104     #[cfg(feature = "http2")]
105     h2_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>,
106     #[cfg(feature = "http3")]
107     h3_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>,
108 }
109 
110 impl<S> Conns<S> {
new(max_conn_num: usize) -> Self111     fn new(max_conn_num: usize) -> Self {
112         Self {
113             usable: WrappedSemaphore::new(max_conn_num),
114 
115             list: Arc::new(Mutex::new(Vec::new())),
116 
117             #[cfg(feature = "http2")]
118             h2_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))),
119 
120             #[cfg(feature = "http3")]
121             h3_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))),
122         }
123     }
124 
125     // fn get_alt_svcs
126 }
127 
128 impl<S> Clone for Conns<S> {
clone(&self) -> Self129     fn clone(&self) -> Self {
130         Self {
131             usable: self.usable.clone(),
132             list: self.list.clone(),
133 
134             #[cfg(feature = "http2")]
135             h2_conn: self.h2_conn.clone(),
136 
137             #[cfg(feature = "http3")]
138             h3_conn: self.h3_conn.clone(),
139         }
140     }
141 }
142 
143 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Send + Sync + 'static> Conns<S> {
conn<C>( &mut self, config: HttpConfig, connector: Arc<C>, url: &Uri, #[cfg(feature = "http3")] alt_svc: Option<Vec<AltService>>, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,144     async fn conn<C>(
145         &mut self,
146         config: HttpConfig,
147         connector: Arc<C>,
148         url: &Uri,
149         #[cfg(feature = "http3")] alt_svc: Option<Vec<AltService>>,
150     ) -> Result<TimeInfoConn<S>, HttpClientError>
151     where
152         C: Connector<Stream = S>,
153     {
154         let conn_start = Instant::now();
155         let mut conn = match config.version {
156             #[cfg(feature = "http3")]
157             HttpVersion::Http3 => self.conn_h3(connector, url, config.http3_config).await,
158             #[cfg(feature = "http2")]
159             HttpVersion::Http2 => self.conn_h2(connector, url, config.http2_config).await,
160             #[cfg(feature = "http1_1")]
161             HttpVersion::Http1 => self.conn_h1(connector, url).await,
162             #[cfg(all(feature = "http1_1", not(feature = "http2")))]
163             HttpVersion::Negotiate => self.conn_h1(connector, url).await,
164             #[cfg(all(feature = "http1_1", feature = "http2"))]
165             HttpVersion::Negotiate => {
166                 #[cfg(feature = "http3")]
167                 if let Some(mut conn) = self
168                     .conn_alt_svc(&connector, url, alt_svc, config.http3_config)
169                     .await
170                 {
171                     conn.time_group_mut().set_connect_start(conn_start);
172                     conn.time_group_mut().set_connect_end(Instant::now());
173                     return Ok(conn);
174                 }
175                 self.conn_negotiate(connector, url, config.http2_config)
176                     .await
177             }
178         }?;
179         conn.time_group_mut().set_connect_start(conn_start);
180         conn.time_group_mut().set_connect_end(Instant::now());
181         Ok(conn)
182     }
183 
conn_h1<C>( &self, connector: Arc<C>, url: &Uri, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,184     async fn conn_h1<C>(
185         &self,
186         connector: Arc<C>,
187         url: &Uri,
188     ) -> Result<TimeInfoConn<S>, HttpClientError>
189     where
190         C: Connector<Stream = S>,
191     {
192         let semaphore = self.usable.acquire().await;
193         match self.exist_h1_conn(semaphore) {
194             H1ConnOption::Some(conn) => Ok(TimeInfoConn::new(conn, TimeGroup::default())),
195             H1ConnOption::None(permit) => {
196                 let stream = connector.connect(url, HttpVersion::Http1).await?;
197                 let time_group = take(stream.conn_data().time_group_mut());
198 
199                 let dispatcher = ConnDispatcher::http1(stream);
200                 let conn = self.dispatch_h1_conn(dispatcher, permit);
201                 Ok(TimeInfoConn::new(conn, time_group))
202             }
203         }
204     }
205 
206     #[cfg(feature = "http2")]
conn_h2<C>( &self, connector: Arc<C>, url: &Uri, config: H2Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,207     async fn conn_h2<C>(
208         &self,
209         connector: Arc<C>,
210         url: &Uri,
211         config: H2Config,
212     ) -> Result<TimeInfoConn<S>, HttpClientError>
213     where
214         C: Connector<Stream = S>,
215     {
216         // The lock `h2_occupation` is used to prevent multiple coroutines from sending
217         // Requests at the same time under concurrent conditions,
218         // resulting in the creation of multiple tcp connections
219         let mut lock = self.h2_conn.lock().await;
220 
221         if let Some(conn) = Self::exist_h2_conn(&mut lock) {
222             return Ok(TimeInfoConn::new(conn, TimeGroup::default()));
223         }
224         let stream = connector.connect(url, HttpVersion::Http2).await?;
225         let mut data = stream.conn_data();
226         let tls = if let Some(scheme) = url.scheme() {
227             *scheme == Scheme::HTTPS
228         } else {
229             false
230         };
231         match data.negotiate().alpn() {
232             None if tls => return err_from_msg!(Connect, "The peer does not support http/2."),
233             Some(protocol) if protocol != b"h2" => {
234                 return err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.")
235             }
236             _ => {}
237         }
238         let time_group = take(data.time_group_mut());
239         let conn = Self::dispatch_h2_conn(data.detail(), config, stream, &mut lock);
240         Ok(TimeInfoConn::new(conn, time_group))
241     }
242 
243     #[cfg(feature = "http3")]
conn_h3<C>( &self, connector: Arc<C>, url: &Uri, config: H3Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,244     async fn conn_h3<C>(
245         &self,
246         connector: Arc<C>,
247         url: &Uri,
248         config: H3Config,
249     ) -> Result<TimeInfoConn<S>, HttpClientError>
250     where
251         C: Connector<Stream = S>,
252     {
253         let mut lock = self.h3_conn.lock().await;
254 
255         if let Some(conn) = Self::exist_h3_conn(&mut lock) {
256             return Ok(TimeInfoConn::new(conn, TimeGroup::default()));
257         }
258         let mut stream = connector.connect(url, HttpVersion::Http3).await?;
259 
260         let quic_conn = stream.quic_conn().ok_or(HttpClientError::from_str(
261             crate::ErrorKind::Connect,
262             "QUIC connect failed",
263         ))?;
264 
265         let mut data = stream.conn_data();
266         let time_group = take(data.time_group_mut());
267         Ok(TimeInfoConn::new(
268             Self::dispatch_h3_conn(data.detail(), config, stream, quic_conn, &mut lock),
269             time_group,
270         ))
271     }
272 
273     #[cfg(all(feature = "http2", feature = "http1_1"))]
conn_negotiate<C>( &self, connector: Arc<C>, url: &Uri, h2_config: H2Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,274     async fn conn_negotiate<C>(
275         &self,
276         connector: Arc<C>,
277         url: &Uri,
278         h2_config: H2Config,
279     ) -> Result<TimeInfoConn<S>, HttpClientError>
280     where
281         C: Connector<Stream = S>,
282     {
283         match *url.scheme().unwrap() {
284             Scheme::HTTPS => {
285                 let mut lock = self.h2_conn.lock().await;
286                 if let Some(conn) = Self::exist_h2_conn(&mut lock) {
287                     return Ok(TimeInfoConn::new(conn, TimeGroup::default()));
288                 }
289                 let permit = self.usable.acquire().await;
290                 let permit = match self.exist_h1_conn(permit) {
291                     H1ConnOption::Some(conn) => {
292                         return Ok(TimeInfoConn::new(conn, TimeGroup::default()));
293                     }
294                     H1ConnOption::None(permit) => permit,
295                 };
296                 let stream = connector.connect(url, HttpVersion::Negotiate).await?;
297                 let mut data = stream.conn_data();
298                 let time_group = take(data.time_group_mut());
299 
300                 let protocol = if let Some(bytes) = data.negotiate().alpn() {
301                     bytes
302                 } else {
303                     let dispatcher = ConnDispatcher::http1(stream);
304                     return Ok(TimeInfoConn::new(
305                         self.dispatch_h1_conn(dispatcher, permit),
306                         time_group,
307                     ));
308                 };
309 
310                 if protocol == b"http/1.1" {
311                     let dispatcher = ConnDispatcher::http1(stream);
312                     Ok(TimeInfoConn::new(
313                         self.dispatch_h1_conn(dispatcher, permit),
314                         time_group,
315                     ))
316                 } else if protocol == b"h2" {
317                     std::mem::drop(permit);
318                     let conn = Self::dispatch_h2_conn(data.detail(), h2_config, stream, &mut lock);
319                     Ok(TimeInfoConn::new(conn, time_group))
320                 } else {
321                     std::mem::drop(permit);
322                     err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.")
323                 }
324             }
325             Scheme::HTTP => self.conn_h1(connector, url).await,
326         }
327     }
328 
329     #[cfg(feature = "http3")]
conn_alt_svc<C>( &self, connector: &Arc<C>, url: &Uri, alt_svcs: Option<Vec<AltService>>, h3_config: H3Config, ) -> Option<TimeInfoConn<S>> where C: Connector<Stream = S>,330     async fn conn_alt_svc<C>(
331         &self,
332         connector: &Arc<C>,
333         url: &Uri,
334         alt_svcs: Option<Vec<AltService>>,
335         h3_config: H3Config,
336     ) -> Option<TimeInfoConn<S>>
337     where
338         C: Connector<Stream = S>,
339     {
340         let mut lock = self.h3_conn.lock().await;
341         if let Some(conn) = Self::exist_h3_conn(&mut lock) {
342             return Some(TimeInfoConn::new(conn, TimeGroup::default()));
343         }
344         if let Some(alt_svcs) = alt_svcs {
345             for alt_svc in alt_svcs {
346                 // only support h3 alt_svc now
347                 if alt_svc.http_version != HttpVersion::Http3 {
348                     continue;
349                 }
350                 let scheme = Scheme::HTTPS;
351                 let host = match alt_svc.host {
352                     Some(ref host) => host.clone(),
353                     None => url.host().cloned().unwrap(),
354                 };
355                 let port = alt_svc.port.clone();
356                 let authority =
357                     Authority::from_bytes((host.to_string() + ":" + port.as_str()).as_bytes())
358                         .ok()?;
359                 let path = url.path().cloned();
360                 let query = url.query().cloned();
361                 let alt_url = Uri::from_raw_parts(Some(scheme), Some(authority), path, query);
362                 let mut stream = connector.connect(&alt_url, HttpVersion::Http3).await.ok()?;
363                 let quic_conn = stream.quic_conn().unwrap();
364                 let mut data = stream.conn_data();
365                 let time_group = take(data.time_group_mut());
366                 return Some(TimeInfoConn::new(
367                     Self::dispatch_h3_conn(
368                         data.detail(),
369                         h3_config.clone(),
370                         stream,
371                         quic_conn,
372                         &mut lock,
373                     ),
374                     time_group,
375                 ));
376             }
377         }
378         None
379     }
380 
dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>, permit: WrappedSemPermit) -> Conn<S>381     fn dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>, permit: WrappedSemPermit) -> Conn<S> {
382         // We must be able to get the `Conn` here.
383         let mut conn = dispatcher.dispatch().unwrap();
384         let mut list = self.list.lock().unwrap();
385         list.push(dispatcher);
386         #[cfg(any(feature = "http2", feature = "http3"))]
387         if let Conn::Http1(ref mut h1) = conn {
388             h1.occupy_sem(permit)
389         }
390         #[cfg(all(not(feature = "http2"), not(feature = "http3")))]
391         {
392             let Conn::Http1(ref mut h1) = conn;
393             h1.occupy_sem(permit)
394         }
395         conn
396     }
397 
398     #[cfg(feature = "http2")]
dispatch_h2_conn( detail: ConnDetail, config: H2Config, stream: S, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Conn<S>399     fn dispatch_h2_conn(
400         detail: ConnDetail,
401         config: H2Config,
402         stream: S,
403         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
404     ) -> Conn<S> {
405         let dispatcher = ConnDispatcher::http2(detail, config, stream);
406         let conn = dispatcher.dispatch().unwrap();
407         lock.push(dispatcher);
408         conn
409     }
410 
411     #[cfg(feature = "http3")]
dispatch_h3_conn( detail: ConnDetail, config: H3Config, stream: S, quic_connection: QuicConn, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Conn<S>412     fn dispatch_h3_conn(
413         detail: ConnDetail,
414         config: H3Config,
415         stream: S,
416         quic_connection: QuicConn,
417         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
418     ) -> Conn<S> {
419         let dispatcher = ConnDispatcher::http3(detail, config, stream, quic_connection);
420         let conn = dispatcher.dispatch().unwrap();
421         lock.push(dispatcher);
422         conn
423     }
424 
exist_h1_conn(&self, permit: WrappedSemPermit) -> H1ConnOption<Conn<S>>425     fn exist_h1_conn(&self, permit: WrappedSemPermit) -> H1ConnOption<Conn<S>> {
426         let mut list = self.list.lock().unwrap();
427         let mut conn = None;
428         let curr = take(&mut *list);
429         // TODO Distinguish between http2 connections and http1 connections.
430         for dispatcher in curr.into_iter() {
431             // Discard invalid dispatchers.
432             if dispatcher.is_shutdown() {
433                 continue;
434             }
435             if conn.is_none() {
436                 conn = dispatcher.dispatch();
437             }
438             list.push(dispatcher);
439         }
440         match conn {
441             Some(Conn::Http1(mut h1)) => {
442                 h1.occupy_sem(permit);
443                 H1ConnOption::Some(Conn::Http1(h1))
444             }
445             _ => H1ConnOption::None(permit),
446         }
447     }
448 
449     #[cfg(feature = "http2")]
exist_h2_conn( lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Option<Conn<S>>450     fn exist_h2_conn(
451         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
452     ) -> Option<Conn<S>> {
453         if let Some(dispatcher) = lock.pop() {
454             if dispatcher.is_shutdown() {
455                 return None;
456             }
457             if !dispatcher.is_goaway() {
458                 if let Some(conn) = dispatcher.dispatch() {
459                     lock.push(dispatcher);
460                     return Some(conn);
461                 }
462             }
463             lock.push(dispatcher);
464         }
465         None
466     }
467 
468     #[cfg(feature = "http3")]
exist_h3_conn( lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Option<Conn<S>>469     fn exist_h3_conn(
470         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
471     ) -> Option<Conn<S>> {
472         if let Some(dispatcher) = lock.pop() {
473             if dispatcher.is_shutdown() {
474                 return None;
475             }
476             if !dispatcher.is_goaway() {
477                 if let Some(conn) = dispatcher.dispatch() {
478                     lock.push(dispatcher);
479                     return Some(conn);
480                 }
481             }
482             // Not all requests have been processed yet
483             lock.push(dispatcher);
484         }
485         None
486     }
487 }
488