• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::error::Error;
15 use std::io::{Read, Write};
16 use std::mem::take;
17 use std::sync::{Arc, Mutex};
18 
19 use crate::error::{ErrorKind, HttpClientError};
20 use crate::sync_impl::Connector;
21 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher};
22 use crate::util::pool::{Pool, PoolKey};
23 use crate::Uri;
24 
25 pub(crate) struct ConnPool<C, S> {
26     pool: Pool<PoolKey, Conns<S>>,
27     connector: Arc<C>,
28 }
29 
30 impl<C: Connector> ConnPool<C, C::Stream> {
new(connector: C) -> Self31     pub(crate) fn new(connector: C) -> Self {
32         Self {
33             pool: Pool::new(),
34             connector: Arc::new(connector),
35         }
36     }
37 
connect_to(&self, uri: Uri) -> Result<Conn<C::Stream>, HttpClientError>38     pub(crate) fn connect_to(&self, uri: Uri) -> Result<Conn<C::Stream>, HttpClientError> {
39         let key = PoolKey::new(
40             uri.scheme().unwrap().clone(),
41             uri.authority().unwrap().clone(),
42         );
43 
44         self.pool
45             .get(key, Conns::new)
46             .conn(|| self.connector.clone().connect(&uri))
47     }
48 }
49 
50 pub(crate) struct Conns<S> {
51     list: Arc<Mutex<Vec<ConnDispatcher<S>>>>,
52 }
53 
54 impl<S> Conns<S> {
new() -> Self55     fn new() -> Self {
56         Self {
57             list: Arc::new(Mutex::new(Vec::new())),
58         }
59     }
60 }
61 
62 impl<S> Clone for Conns<S> {
clone(&self) -> Self63     fn clone(&self) -> Self {
64         Self {
65             list: self.list.clone(),
66         }
67     }
68 }
69 
70 impl<S: Read + Write + 'static> Conns<S> {
conn<F, E>(&self, connect_fn: F) -> Result<Conn<S>, HttpClientError> where F: FnOnce() -> Result<S, E>, E: Into<Box<dyn Error + Send + Sync>>,71     fn conn<F, E>(&self, connect_fn: F) -> Result<Conn<S>, HttpClientError>
72     where
73         F: FnOnce() -> Result<S, E>,
74         E: Into<Box<dyn Error + Send + Sync>>,
75     {
76         let mut list = self.list.lock().unwrap();
77         let mut conn = None;
78         let curr = take(&mut *list);
79         for dispatcher in curr.into_iter() {
80             // Discard invalid dispatchers.
81             if dispatcher.is_shutdown() {
82                 continue;
83             }
84             if conn.is_none() {
85                 conn = dispatcher.dispatch();
86             }
87             list.push(dispatcher);
88         }
89 
90         if let Some(conn) = conn {
91             Ok(conn)
92         } else {
93             let dispatcher = ConnDispatcher::http1(
94                 connect_fn()
95                     .map_err(|e| HttpClientError::new_with_cause(ErrorKind::Connect, Some(e)))?,
96             );
97             // We must be able to get the `Conn` here.
98             let conn = dispatcher.dispatch().unwrap();
99             list.push(dispatcher);
100             Ok(conn)
101         }
102     }
103 }
104