1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 use std::error::Error; 15 use std::io::{Read, Write}; 16 use std::mem::take; 17 use std::sync::{Arc, Mutex}; 18 19 use crate::error::{ErrorKind, HttpClientError}; 20 use crate::sync_impl::Connector; 21 use crate::util::dispatcher::{Conn, ConnDispatcher, Dispatcher}; 22 use crate::util::pool::{Pool, PoolKey}; 23 use crate::Uri; 24 25 pub(crate) struct ConnPool<C, S> { 26 pool: Pool<PoolKey, Conns<S>>, 27 connector: Arc<C>, 28 } 29 30 impl<C: Connector> ConnPool<C, C::Stream> { new(connector: C) -> Self31 pub(crate) fn new(connector: C) -> Self { 32 Self { 33 pool: Pool::new(), 34 connector: Arc::new(connector), 35 } 36 } 37 connect_to(&self, uri: Uri) -> Result<Conn<C::Stream>, HttpClientError>38 pub(crate) fn connect_to(&self, uri: Uri) -> Result<Conn<C::Stream>, HttpClientError> { 39 let key = PoolKey::new( 40 uri.scheme().unwrap().clone(), 41 uri.authority().unwrap().clone(), 42 ); 43 44 self.pool 45 .get(key, Conns::new) 46 .conn(|| self.connector.clone().connect(&uri)) 47 } 48 } 49 50 pub(crate) struct Conns<S> { 51 list: Arc<Mutex<Vec<ConnDispatcher<S>>>>, 52 } 53 54 impl<S> Conns<S> { new() -> Self55 fn new() -> Self { 56 Self { 57 list: Arc::new(Mutex::new(Vec::new())), 58 } 59 } 60 } 61 62 impl<S> Clone for Conns<S> { clone(&self) -> Self63 fn clone(&self) -> Self { 64 Self { 65 list: self.list.clone(), 66 } 67 } 68 } 69 70 impl<S: Read + Write + 'static> Conns<S> { conn<F, E>(&self, connect_fn: F) -> Result<Conn<S>, HttpClientError> where F: FnOnce() -> Result<S, E>, E: Into<Box<dyn Error + Send + Sync>>,71 fn conn<F, E>(&self, connect_fn: F) -> Result<Conn<S>, HttpClientError> 72 where 73 F: FnOnce() -> Result<S, E>, 74 E: Into<Box<dyn Error + Send + Sync>>, 75 { 76 let mut list = self.list.lock().unwrap(); 77 let mut conn = None; 78 let curr = take(&mut *list); 79 for dispatcher in curr.into_iter() { 80 // Discard invalid dispatchers. 81 if dispatcher.is_shutdown() { 82 continue; 83 } 84 if conn.is_none() { 85 conn = dispatcher.dispatch(); 86 } 87 list.push(dispatcher); 88 } 89 90 if let Some(conn) = conn { 91 Ok(conn) 92 } else { 93 let dispatcher = ConnDispatcher::http1( 94 connect_fn() 95 .map_err(|e| HttpClientError::new_with_cause(ErrorKind::Connect, Some(e)))?, 96 ); 97 // We must be able to get the `Conn` here. 98 let conn = dispatcher.dispatch().unwrap(); 99 list.push(dispatcher); 100 Ok(conn) 101 } 102 } 103 } 104