• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::mem::take;
15 use std::sync::{Arc, Mutex};
16 use std::time::Instant;
17 
18 #[cfg(feature = "http3")]
19 use ylong_http::request::uri::Authority;
20 #[cfg(any(feature = "http2", feature = "http3"))]
21 use ylong_http::request::uri::Scheme;
22 use ylong_http::request::uri::Uri;
23 
24 #[cfg(feature = "http3")]
25 use crate::async_impl::quic::QuicConn;
26 use crate::async_impl::Connector;
27 #[cfg(feature = "http3")]
28 use crate::async_impl::Response;
29 use crate::error::HttpClientError;
30 use crate::runtime::{AsyncRead, AsyncWrite};
31 #[cfg(feature = "http3")]
32 use crate::util::alt_svc::{AltService, AltServiceMap};
33 #[cfg(feature = "http2")]
34 use crate::util::config::H2Config;
35 #[cfg(feature = "http3")]
36 use crate::util::config::H3Config;
37 use crate::util::config::{HttpConfig, HttpVersion};
38 use crate::util::dispatcher::http1::{WrappedSemPermit, WrappedSemaphore};
39 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher, TimeInfoConn};
40 use crate::util::pool::{Pool, PoolKey};
41 use crate::util::progress::SpeedConfig;
42 #[cfg(feature = "http3")]
43 use crate::util::request::RequestArc;
44 use crate::util::ConnInfo;
45 #[cfg(feature = "http2")]
46 use crate::ConnDetail;
47 use crate::TimeGroup;
48 
49 pub(crate) struct ConnPool<C, S> {
50     pool: Pool<PoolKey, Conns<S>>,
51     #[cfg(feature = "http3")]
52     alt_svcs: AltServiceMap,
53     connector: Arc<C>,
54     config: HttpConfig,
55 }
56 
57 impl<C: Connector> ConnPool<C, C::Stream> {
new(config: HttpConfig, connector: C) -> Self58     pub(crate) fn new(config: HttpConfig, connector: C) -> Self {
59         Self {
60             pool: Pool::new(),
61             #[cfg(feature = "http3")]
62             alt_svcs: AltServiceMap::new(),
63             connector: Arc::new(connector),
64             config,
65         }
66     }
67 
connect_to( &self, uri: &Uri, ) -> Result<TimeInfoConn<C::Stream>, HttpClientError>68     pub(crate) async fn connect_to(
69         &self,
70         uri: &Uri,
71     ) -> Result<TimeInfoConn<C::Stream>, HttpClientError> {
72         let key = PoolKey::new(
73             uri.scheme().unwrap().clone(),
74             uri.authority().unwrap().clone(),
75         );
76 
77         #[cfg(feature = "http3")]
78         let alt_svc = self.alt_svcs.get_alt_svcs(&key);
79         self.pool
80             .get(
81                 key,
82                 Conns::new,
83                 self.config.http1_config.max_conn_num(),
84                 self.config.speed_config,
85             )
86             .conn(
87                 self.config.clone(),
88                 self.connector.clone(),
89                 uri,
90                 #[cfg(feature = "http3")]
91                 alt_svc,
92             )
93             .await
94     }
95 
96     #[cfg(feature = "http3")]
set_alt_svcs(&self, request: RequestArc, response: &Response)97     pub(crate) fn set_alt_svcs(&self, request: RequestArc, response: &Response) {
98         self.alt_svcs.set_alt_svcs(request, response);
99     }
100 }
101 
102 pub(crate) enum H1ConnOption<T> {
103     Some(T),
104     None(WrappedSemPermit),
105 }
106 
107 pub(crate) struct Conns<S> {
108     speed_config: SpeedConfig,
109     usable: WrappedSemaphore,
110     list: Arc<Mutex<Vec<ConnDispatcher<S>>>>,
111     #[cfg(feature = "http2")]
112     h2_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>,
113     #[cfg(feature = "http3")]
114     h3_conn: Arc<crate::runtime::AsyncMutex<Vec<ConnDispatcher<S>>>>,
115 }
116 
117 impl<S> Conns<S> {
new(max_conn_num: usize, speed_config: SpeedConfig) -> Self118     fn new(max_conn_num: usize, speed_config: SpeedConfig) -> Self {
119         Self {
120             speed_config,
121             usable: WrappedSemaphore::new(max_conn_num),
122 
123             list: Arc::new(Mutex::new(Vec::new())),
124 
125             #[cfg(feature = "http2")]
126             h2_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))),
127 
128             #[cfg(feature = "http3")]
129             h3_conn: Arc::new(crate::runtime::AsyncMutex::new(Vec::with_capacity(1))),
130         }
131     }
132 
133     // fn get_alt_svcs
134 }
135 
136 impl<S> Clone for Conns<S> {
clone(&self) -> Self137     fn clone(&self) -> Self {
138         Self {
139             speed_config: self.speed_config,
140             usable: self.usable.clone(),
141             list: self.list.clone(),
142 
143             #[cfg(feature = "http2")]
144             h2_conn: self.h2_conn.clone(),
145 
146             #[cfg(feature = "http3")]
147             h3_conn: self.h3_conn.clone(),
148         }
149     }
150 }
151 
152 impl<S: AsyncRead + AsyncWrite + ConnInfo + Unpin + Send + Sync + 'static> Conns<S> {
conn<C>( &mut self, config: HttpConfig, connector: Arc<C>, url: &Uri, #[cfg(feature = "http3")] alt_svc: Option<Vec<AltService>>, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,153     async fn conn<C>(
154         &mut self,
155         config: HttpConfig,
156         connector: Arc<C>,
157         url: &Uri,
158         #[cfg(feature = "http3")] alt_svc: Option<Vec<AltService>>,
159     ) -> Result<TimeInfoConn<S>, HttpClientError>
160     where
161         C: Connector<Stream = S>,
162     {
163         let conn_start = Instant::now();
164         let mut conn = match config.version {
165             #[cfg(feature = "http3")]
166             HttpVersion::Http3 => self.conn_h3(connector, url, config.http3_config).await,
167             #[cfg(feature = "http2")]
168             HttpVersion::Http2 => self.conn_h2(connector, url, config.http2_config).await,
169             #[cfg(feature = "http1_1")]
170             HttpVersion::Http1 => self.conn_h1(connector, url).await,
171             #[cfg(all(feature = "http1_1", not(feature = "http2")))]
172             HttpVersion::Negotiate => self.conn_h1(connector, url).await,
173             #[cfg(all(feature = "http1_1", feature = "http2"))]
174             HttpVersion::Negotiate => {
175                 #[cfg(feature = "http3")]
176                 if let Some(mut conn) = self
177                     .conn_alt_svc(&connector, url, alt_svc, config.http3_config)
178                     .await
179                 {
180                     conn.time_group_mut().set_connect_start(conn_start);
181                     conn.time_group_mut().set_connect_end(Instant::now());
182                     return Ok(conn);
183                 }
184                 self.conn_negotiate(connector, url, config.http2_config)
185                     .await
186             }
187         }?;
188         conn.time_group_mut().set_connect_start(conn_start);
189         conn.time_group_mut().set_connect_end(Instant::now());
190         Ok(conn)
191     }
192 
conn_h1<C>( &self, connector: Arc<C>, url: &Uri, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,193     async fn conn_h1<C>(
194         &self,
195         connector: Arc<C>,
196         url: &Uri,
197     ) -> Result<TimeInfoConn<S>, HttpClientError>
198     where
199         C: Connector<Stream = S>,
200     {
201         let semaphore = self.usable.acquire().await;
202         match self.exist_h1_conn(semaphore) {
203             H1ConnOption::Some(conn) => Ok(TimeInfoConn::new(conn, TimeGroup::default())),
204             H1ConnOption::None(permit) => {
205                 let stream = connector.connect(url, HttpVersion::Http1).await?;
206                 let time_group = take(stream.conn_data().time_group_mut());
207 
208                 let dispatcher = ConnDispatcher::http1(stream);
209                 let conn = self.dispatch_h1_conn(dispatcher, permit);
210                 Ok(TimeInfoConn::new(conn, time_group))
211             }
212         }
213     }
214 
215     #[cfg(feature = "http2")]
conn_h2<C>( &self, connector: Arc<C>, url: &Uri, config: H2Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,216     async fn conn_h2<C>(
217         &self,
218         connector: Arc<C>,
219         url: &Uri,
220         config: H2Config,
221     ) -> Result<TimeInfoConn<S>, HttpClientError>
222     where
223         C: Connector<Stream = S>,
224     {
225         // The lock `h2_occupation` is used to prevent multiple coroutines from sending
226         // Requests at the same time under concurrent conditions,
227         // resulting in the creation of multiple tcp connections
228         let mut lock = self.h2_conn.lock().await;
229 
230         if let Some(conn) = self.exist_h2_conn(&mut lock) {
231             return Ok(TimeInfoConn::new(conn, TimeGroup::default()));
232         }
233         let stream = connector.connect(url, HttpVersion::Http2).await?;
234         let mut data = stream.conn_data();
235         let tls = if let Some(scheme) = url.scheme() {
236             *scheme == Scheme::HTTPS
237         } else {
238             false
239         };
240         match data.negotiate().alpn() {
241             None if tls => return err_from_msg!(Connect, "The peer does not support http/2."),
242             Some(protocol) if protocol != b"h2" => {
243                 return err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.")
244             }
245             _ => {}
246         }
247         let time_group = take(data.time_group_mut());
248         let conn = self.dispatch_h2_conn(data.detail(), config, stream, &mut lock);
249         Ok(TimeInfoConn::new(conn, time_group))
250     }
251 
252     #[cfg(feature = "http3")]
conn_h3<C>( &self, connector: Arc<C>, url: &Uri, config: H3Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,253     async fn conn_h3<C>(
254         &self,
255         connector: Arc<C>,
256         url: &Uri,
257         config: H3Config,
258     ) -> Result<TimeInfoConn<S>, HttpClientError>
259     where
260         C: Connector<Stream = S>,
261     {
262         let mut lock = self.h3_conn.lock().await;
263 
264         if let Some(conn) = self.exist_h3_conn(&mut lock) {
265             return Ok(TimeInfoConn::new(conn, TimeGroup::default()));
266         }
267         let mut stream = connector.connect(url, HttpVersion::Http3).await?;
268 
269         let quic_conn = stream.quic_conn().ok_or(HttpClientError::from_str(
270             crate::ErrorKind::Connect,
271             "QUIC connect failed",
272         ))?;
273 
274         let mut data = stream.conn_data();
275         let time_group = take(data.time_group_mut());
276         Ok(TimeInfoConn::new(
277             self.dispatch_h3_conn(data.detail(), config, stream, quic_conn, &mut lock),
278             time_group,
279         ))
280     }
281 
282     #[cfg(all(feature = "http2", feature = "http1_1"))]
conn_negotiate<C>( &self, connector: Arc<C>, url: &Uri, h2_config: H2Config, ) -> Result<TimeInfoConn<S>, HttpClientError> where C: Connector<Stream = S>,283     async fn conn_negotiate<C>(
284         &self,
285         connector: Arc<C>,
286         url: &Uri,
287         h2_config: H2Config,
288     ) -> Result<TimeInfoConn<S>, HttpClientError>
289     where
290         C: Connector<Stream = S>,
291     {
292         match *url.scheme().unwrap() {
293             Scheme::HTTPS => {
294                 let mut lock = self.h2_conn.lock().await;
295                 if let Some(conn) = self.exist_h2_conn(&mut lock) {
296                     return Ok(TimeInfoConn::new(conn, TimeGroup::default()));
297                 }
298                 let permit = self.usable.acquire().await;
299                 let permit = match self.exist_h1_conn(permit) {
300                     H1ConnOption::Some(conn) => {
301                         return Ok(TimeInfoConn::new(conn, TimeGroup::default()));
302                     }
303                     H1ConnOption::None(permit) => permit,
304                 };
305                 let stream = connector.connect(url, HttpVersion::Negotiate).await?;
306                 let mut data = stream.conn_data();
307                 let time_group = take(data.time_group_mut());
308 
309                 let protocol = data.negotiate().alpn().unwrap_or(b"http/1.1");
310                 if protocol == b"http/1.1" {
311                     let dispatcher = ConnDispatcher::http1(stream);
312                     Ok(TimeInfoConn::new(
313                         self.dispatch_h1_conn(dispatcher, permit),
314                         time_group,
315                     ))
316                 } else if protocol == b"h2" {
317                     std::mem::drop(permit);
318                     let conn = self.dispatch_h2_conn(data.detail(), h2_config, stream, &mut lock);
319                     Ok(TimeInfoConn::new(conn, time_group))
320                 } else {
321                     std::mem::drop(permit);
322                     err_from_msg!(Connect, "Alpn negotiate a wrong protocol version.")
323                 }
324             }
325             Scheme::HTTP => self.conn_h1(connector, url).await,
326         }
327     }
328 
329     #[cfg(feature = "http3")]
conn_alt_svc<C>( &self, connector: &Arc<C>, url: &Uri, alt_svcs: Option<Vec<AltService>>, h3_config: H3Config, ) -> Option<TimeInfoConn<S>> where C: Connector<Stream = S>,330     async fn conn_alt_svc<C>(
331         &self,
332         connector: &Arc<C>,
333         url: &Uri,
334         alt_svcs: Option<Vec<AltService>>,
335         h3_config: H3Config,
336     ) -> Option<TimeInfoConn<S>>
337     where
338         C: Connector<Stream = S>,
339     {
340         let mut lock = self.h3_conn.lock().await;
341         if let Some(conn) = self.exist_h3_conn(&mut lock) {
342             return Some(TimeInfoConn::new(conn, TimeGroup::default()));
343         }
344         if let Some(alt_svcs) = alt_svcs {
345             for alt_svc in alt_svcs {
346                 // only support h3 alt_svc now
347                 if alt_svc.http_version != HttpVersion::Http3 {
348                     continue;
349                 }
350                 let scheme = Scheme::HTTPS;
351                 let host = match alt_svc.host {
352                     Some(ref host) => host.clone(),
353                     None => url.host().cloned().unwrap(),
354                 };
355                 let port = alt_svc.port.clone();
356                 let authority =
357                     Authority::from_bytes((host.to_string() + ":" + port.as_str()).as_bytes())
358                         .ok()?;
359                 let path = url.path().cloned();
360                 let query = url.query().cloned();
361                 let alt_url = Uri::from_raw_parts(Some(scheme), Some(authority), path, query);
362                 let mut stream = connector.connect(&alt_url, HttpVersion::Http3).await.ok()?;
363                 let quic_conn = stream.quic_conn().unwrap();
364                 let mut data = stream.conn_data();
365                 let time_group = take(data.time_group_mut());
366                 return Some(TimeInfoConn::new(
367                     self.dispatch_h3_conn(
368                         data.detail(),
369                         h3_config.clone(),
370                         stream,
371                         quic_conn,
372                         &mut lock,
373                     ),
374                     time_group,
375                 ));
376             }
377         }
378         None
379     }
380 
dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>, permit: WrappedSemPermit) -> Conn<S>381     fn dispatch_h1_conn(&self, dispatcher: ConnDispatcher<S>, permit: WrappedSemPermit) -> Conn<S> {
382         // We must be able to get the `Conn` here.
383         let mut conn = dispatcher.dispatch().unwrap();
384         let mut list = self.list.lock().unwrap();
385         list.push(dispatcher);
386         #[cfg(any(feature = "http2", feature = "http3"))]
387         if let Conn::Http1(ref mut h1) = conn {
388             h1.speed_controller.set_speed_limit(self.speed_config);
389             h1.occupy_sem(permit)
390         }
391         #[cfg(all(not(feature = "http2"), not(feature = "http3")))]
392         {
393             let Conn::Http1(ref mut h1) = conn;
394             h1.speed_controller.set_speed_limit(self.speed_config);
395             h1.occupy_sem(permit)
396         }
397         conn
398     }
399 
400     #[cfg(feature = "http2")]
dispatch_h2_conn( &self, detail: ConnDetail, config: H2Config, stream: S, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Conn<S>401     fn dispatch_h2_conn(
402         &self,
403         detail: ConnDetail,
404         config: H2Config,
405         stream: S,
406         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
407     ) -> Conn<S> {
408         let dispatcher = ConnDispatcher::http2(detail, config, stream);
409         let mut conn = dispatcher.dispatch().unwrap();
410         lock.push(dispatcher);
411         if let Conn::Http2(ref mut h2) = conn {
412             h2.speed_controller.set_speed_limit(self.speed_config);
413         }
414         conn
415     }
416 
417     #[cfg(feature = "http3")]
dispatch_h3_conn( &self, detail: ConnDetail, config: H3Config, stream: S, quic_connection: QuicConn, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Conn<S>418     fn dispatch_h3_conn(
419         &self,
420         detail: ConnDetail,
421         config: H3Config,
422         stream: S,
423         quic_connection: QuicConn,
424         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
425     ) -> Conn<S> {
426         let dispatcher = ConnDispatcher::http3(detail, config, stream, quic_connection);
427         let mut conn = dispatcher.dispatch().unwrap();
428         lock.push(dispatcher);
429         if let Conn::Http3(ref mut h3) = conn {
430             h3.speed_controller.set_speed_limit(self.speed_config);
431         }
432         conn
433     }
434 
exist_h1_conn(&self, permit: WrappedSemPermit) -> H1ConnOption<Conn<S>>435     fn exist_h1_conn(&self, permit: WrappedSemPermit) -> H1ConnOption<Conn<S>> {
436         let mut list = self.list.lock().unwrap();
437         let mut conn = None;
438         let curr = take(&mut *list);
439         // TODO Distinguish between http2 connections and http1 connections.
440         for dispatcher in curr.into_iter() {
441             // Discard invalid dispatchers.
442             if dispatcher.is_shutdown() {
443                 continue;
444             }
445             if conn.is_none() {
446                 conn = dispatcher.dispatch();
447             }
448             list.push(dispatcher);
449         }
450         match conn {
451             Some(Conn::Http1(mut h1)) => {
452                 h1.occupy_sem(permit);
453                 h1.speed_controller.set_speed_limit(self.speed_config);
454                 H1ConnOption::Some(Conn::Http1(h1))
455             }
456             _ => H1ConnOption::None(permit),
457         }
458     }
459 
460     #[cfg(feature = "http2")]
exist_h2_conn( &self, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Option<Conn<S>>461     fn exist_h2_conn(
462         &self,
463         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
464     ) -> Option<Conn<S>> {
465         if let Some(dispatcher) = lock.pop() {
466             if dispatcher.is_shutdown() {
467                 return None;
468             }
469             if !dispatcher.is_goaway() {
470                 if let Some(Conn::Http2(mut h2)) = dispatcher.dispatch() {
471                     lock.push(dispatcher);
472                     h2.speed_controller.set_speed_limit(self.speed_config);
473                     return Some(Conn::Http2(h2));
474                 }
475             }
476             lock.push(dispatcher);
477         }
478         None
479     }
480 
481     #[cfg(feature = "http3")]
exist_h3_conn( &self, lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>, ) -> Option<Conn<S>>482     fn exist_h3_conn(
483         &self,
484         lock: &mut crate::runtime::MutexGuard<Vec<ConnDispatcher<S>>>,
485     ) -> Option<Conn<S>> {
486         if let Some(dispatcher) = lock.pop() {
487             if dispatcher.is_shutdown() {
488                 return None;
489             }
490             if !dispatcher.is_goaway() {
491                 if let Some(Conn::Http3(mut h3)) = dispatcher.dispatch() {
492                     lock.push(dispatcher);
493                     h3.speed_controller.set_speed_limit(self.speed_config);
494                     return Some(Conn::Http3(h3));
495                 }
496             }
497             // Not all requests have been processed yet
498             lock.push(dispatcher);
499         }
500         None
501     }
502 }
503