1 use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent}; 2 use crate::{ 3 body::BoxBody, 4 transport::{BoxFuture, Endpoint}, 5 }; 6 use http::Uri; 7 use hyper::client::conn::Builder; 8 use hyper::client::connect::Connection as HyperConnection; 9 use hyper::client::service::Connect as HyperConnect; 10 use std::{ 11 fmt, 12 task::{Context, Poll}, 13 }; 14 use tokio::io::{AsyncRead, AsyncWrite}; 15 use tower::load::Load; 16 use tower::{ 17 layer::Layer, 18 limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer}, 19 util::BoxService, 20 ServiceBuilder, ServiceExt, 21 }; 22 use tower_service::Service; 23 24 pub(crate) type Request = http::Request<BoxBody>; 25 pub(crate) type Response = http::Response<hyper::Body>; 26 27 pub(crate) struct Connection { 28 inner: BoxService<Request, Response, crate::Error>, 29 } 30 31 impl Connection { new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,32 fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self 33 where 34 C: Service<Uri> + Send + 'static, 35 C::Error: Into<crate::Error> + Send, 36 C::Future: Unpin + Send, 37 C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, 38 { 39 let mut settings = Builder::new() 40 .http2_initial_stream_window_size(endpoint.init_stream_window_size) 41 .http2_initial_connection_window_size(endpoint.init_connection_window_size) 42 .http2_only(true) 43 .http2_keep_alive_interval(endpoint.http2_keep_alive_interval) 44 .executor(endpoint.executor.clone()) 45 .clone(); 46 47 if let Some(val) = endpoint.http2_keep_alive_timeout { 48 settings.http2_keep_alive_timeout(val); 49 } 50 51 if let Some(val) = endpoint.http2_keep_alive_while_idle { 52 settings.http2_keep_alive_while_idle(val); 53 } 54 55 if let Some(val) = endpoint.http2_adaptive_window { 56 settings.http2_adaptive_window(val); 57 } 58 59 let stack = ServiceBuilder::new() 60 .layer_fn(|s| { 61 let origin = endpoint.origin.as_ref().unwrap_or(&endpoint.uri).clone(); 62 63 AddOrigin::new(s, origin) 64 }) 65 .layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone())) 66 .layer_fn(|s| GrpcTimeout::new(s, endpoint.timeout)) 67 .option_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new)) 68 .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d))) 69 .into_inner(); 70 71 let connector = HyperConnect::new(connector, settings); 72 let conn = Reconnect::new(connector, endpoint.uri.clone(), is_lazy); 73 74 let inner = stack.layer(conn); 75 76 Self { 77 inner: BoxService::new(inner), 78 } 79 } 80 connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error> where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,81 pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error> 82 where 83 C: Service<Uri> + Send + 'static, 84 C::Error: Into<crate::Error> + Send, 85 C::Future: Unpin + Send, 86 C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, 87 { 88 Self::new(connector, endpoint, false).ready_oneshot().await 89 } 90 lazy<C>(connector: C, endpoint: Endpoint) -> Self where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,91 pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self 92 where 93 C: Service<Uri> + Send + 'static, 94 C::Error: Into<crate::Error> + Send, 95 C::Future: Unpin + Send, 96 C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, 97 { 98 Self::new(connector, endpoint, true) 99 } 100 } 101 102 impl Service<Request> for Connection { 103 type Response = Response; 104 type Error = crate::Error; 105 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>; 106 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>107 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 108 Service::poll_ready(&mut self.inner, cx).map_err(Into::into) 109 } 110 call(&mut self, req: Request) -> Self::Future111 fn call(&mut self, req: Request) -> Self::Future { 112 self.inner.call(req) 113 } 114 } 115 116 impl Load for Connection { 117 type Metric = usize; 118 load(&self) -> Self::Metric119 fn load(&self) -> Self::Metric { 120 0 121 } 122 } 123 124 impl fmt::Debug for Connection { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 126 f.debug_struct("Connection").finish() 127 } 128 } 129