• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::{grpc_timeout::GrpcTimeout, reconnect::Reconnect, AddOrigin, UserAgent};
2 use crate::{
3     body::BoxBody,
4     transport::{BoxFuture, Endpoint},
5 };
6 use http::Uri;
7 use hyper::client::conn::Builder;
8 use hyper::client::connect::Connection as HyperConnection;
9 use hyper::client::service::Connect as HyperConnect;
10 use std::{
11     fmt,
12     task::{Context, Poll},
13 };
14 use tokio::io::{AsyncRead, AsyncWrite};
15 use tower::load::Load;
16 use tower::{
17     layer::Layer,
18     limit::{concurrency::ConcurrencyLimitLayer, rate::RateLimitLayer},
19     util::BoxService,
20     ServiceBuilder, ServiceExt,
21 };
22 use tower_service::Service;
23 
24 pub(crate) type Request = http::Request<BoxBody>;
25 pub(crate) type Response = http::Response<hyper::Body>;
26 
27 pub(crate) struct Connection {
28     inner: BoxService<Request, Response, crate::Error>,
29 }
30 
31 impl Connection {
new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,32     fn new<C>(connector: C, endpoint: Endpoint, is_lazy: bool) -> Self
33     where
34         C: Service<Uri> + Send + 'static,
35         C::Error: Into<crate::Error> + Send,
36         C::Future: Unpin + Send,
37         C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
38     {
39         let mut settings = Builder::new()
40             .http2_initial_stream_window_size(endpoint.init_stream_window_size)
41             .http2_initial_connection_window_size(endpoint.init_connection_window_size)
42             .http2_only(true)
43             .http2_keep_alive_interval(endpoint.http2_keep_alive_interval)
44             .executor(endpoint.executor.clone())
45             .clone();
46 
47         if let Some(val) = endpoint.http2_keep_alive_timeout {
48             settings.http2_keep_alive_timeout(val);
49         }
50 
51         if let Some(val) = endpoint.http2_keep_alive_while_idle {
52             settings.http2_keep_alive_while_idle(val);
53         }
54 
55         if let Some(val) = endpoint.http2_adaptive_window {
56             settings.http2_adaptive_window(val);
57         }
58 
59         let stack = ServiceBuilder::new()
60             .layer_fn(|s| {
61                 let origin = endpoint.origin.as_ref().unwrap_or(&endpoint.uri).clone();
62 
63                 AddOrigin::new(s, origin)
64             })
65             .layer_fn(|s| UserAgent::new(s, endpoint.user_agent.clone()))
66             .layer_fn(|s| GrpcTimeout::new(s, endpoint.timeout))
67             .option_layer(endpoint.concurrency_limit.map(ConcurrencyLimitLayer::new))
68             .option_layer(endpoint.rate_limit.map(|(l, d)| RateLimitLayer::new(l, d)))
69             .into_inner();
70 
71         let connector = HyperConnect::new(connector, settings);
72         let conn = Reconnect::new(connector, endpoint.uri.clone(), is_lazy);
73 
74         let inner = stack.layer(conn);
75 
76         Self {
77             inner: BoxService::new(inner),
78         }
79     }
80 
connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error> where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,81     pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, crate::Error>
82     where
83         C: Service<Uri> + Send + 'static,
84         C::Error: Into<crate::Error> + Send,
85         C::Future: Unpin + Send,
86         C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
87     {
88         Self::new(connector, endpoint, false).ready_oneshot().await
89     }
90 
lazy<C>(connector: C, endpoint: Endpoint) -> Self where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,91     pub(crate) fn lazy<C>(connector: C, endpoint: Endpoint) -> Self
92     where
93         C: Service<Uri> + Send + 'static,
94         C::Error: Into<crate::Error> + Send,
95         C::Future: Unpin + Send,
96         C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
97     {
98         Self::new(connector, endpoint, true)
99     }
100 }
101 
102 impl Service<Request> for Connection {
103     type Response = Response;
104     type Error = crate::Error;
105     type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
106 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>107     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
108         Service::poll_ready(&mut self.inner, cx).map_err(Into::into)
109     }
110 
call(&mut self, req: Request) -> Self::Future111     fn call(&mut self, req: Request) -> Self::Future {
112         self.inner.call(req)
113     }
114 }
115 
116 impl Load for Connection {
117     type Metric = usize;
118 
load(&self) -> Self::Metric119     fn load(&self) -> Self::Metric {
120         0
121     }
122 }
123 
124 impl fmt::Debug for Connection {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result125     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
126         f.debug_struct("Connection").finish()
127     }
128 }
129