• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::super::service;
2 use super::Channel;
3 #[cfg(feature = "tls")]
4 use super::ClientTlsConfig;
5 #[cfg(feature = "tls")]
6 use crate::transport::service::TlsConnector;
7 use crate::transport::{service::SharedExec, Error, Executor};
8 use bytes::Bytes;
9 use http::{uri::Uri, HeaderValue};
10 use std::{fmt, future::Future, pin::Pin, str::FromStr, time::Duration};
11 use tower::make::MakeConnection;
12 // use crate::transport::E
13 
14 /// Channel builder.
15 ///
16 /// This struct is used to build and configure HTTP/2 channels.
17 #[derive(Clone)]
18 pub struct Endpoint {
19     pub(crate) uri: Uri,
20     pub(crate) origin: Option<Uri>,
21     pub(crate) user_agent: Option<HeaderValue>,
22     pub(crate) timeout: Option<Duration>,
23     pub(crate) concurrency_limit: Option<usize>,
24     pub(crate) rate_limit: Option<(u64, Duration)>,
25     #[cfg(feature = "tls")]
26     pub(crate) tls: Option<TlsConnector>,
27     pub(crate) buffer_size: Option<usize>,
28     pub(crate) init_stream_window_size: Option<u32>,
29     pub(crate) init_connection_window_size: Option<u32>,
30     pub(crate) tcp_keepalive: Option<Duration>,
31     pub(crate) tcp_nodelay: bool,
32     pub(crate) http2_keep_alive_interval: Option<Duration>,
33     pub(crate) http2_keep_alive_timeout: Option<Duration>,
34     pub(crate) http2_keep_alive_while_idle: Option<bool>,
35     pub(crate) connect_timeout: Option<Duration>,
36     pub(crate) http2_adaptive_window: Option<bool>,
37     pub(crate) executor: SharedExec,
38 }
39 
40 impl Endpoint {
41     // FIXME: determine if we want to expose this or not. This is really
42     // just used in codegen for a shortcut.
43     #[doc(hidden)]
new<D>(dst: D) -> Result<Self, Error> where D: TryInto<Self>, D::Error: Into<crate::Error>,44     pub fn new<D>(dst: D) -> Result<Self, Error>
45     where
46         D: TryInto<Self>,
47         D::Error: Into<crate::Error>,
48     {
49         let me = dst.try_into().map_err(|e| Error::from_source(e.into()))?;
50         Ok(me)
51     }
52 
53     /// Convert an `Endpoint` from a static string.
54     ///
55     /// # Panics
56     ///
57     /// This function panics if the argument is an invalid URI.
58     ///
59     /// ```
60     /// # use tonic::transport::Endpoint;
61     /// Endpoint::from_static("https://example.com");
62     /// ```
from_static(s: &'static str) -> Self63     pub fn from_static(s: &'static str) -> Self {
64         let uri = Uri::from_static(s);
65         Self::from(uri)
66     }
67 
68     /// Convert an `Endpoint` from shared bytes.
69     ///
70     /// ```
71     /// # use tonic::transport::Endpoint;
72     /// Endpoint::from_shared("https://example.com".to_string());
73     /// ```
from_shared(s: impl Into<Bytes>) -> Result<Self, Error>74     pub fn from_shared(s: impl Into<Bytes>) -> Result<Self, Error> {
75         let uri = Uri::from_maybe_shared(s.into()).map_err(|e| Error::new_invalid_uri().with(e))?;
76         Ok(Self::from(uri))
77     }
78 
79     /// Set a custom user-agent header.
80     ///
81     /// `user_agent` will be prepended to Tonic's default user-agent string (`tonic/x.x.x`).
82     /// It must be a value that can be converted into a valid  `http::HeaderValue` or building
83     /// the endpoint will fail.
84     /// ```
85     /// # use tonic::transport::Endpoint;
86     /// # let mut builder = Endpoint::from_static("https://example.com");
87     /// builder.user_agent("Greeter").expect("Greeter should be a valid header value");
88     /// // user-agent: "Greeter tonic/x.x.x"
89     /// ```
user_agent<T>(self, user_agent: T) -> Result<Self, Error> where T: TryInto<HeaderValue>,90     pub fn user_agent<T>(self, user_agent: T) -> Result<Self, Error>
91     where
92         T: TryInto<HeaderValue>,
93     {
94         user_agent
95             .try_into()
96             .map(|ua| Endpoint {
97                 user_agent: Some(ua),
98                 ..self
99             })
100             .map_err(|_| Error::new_invalid_user_agent())
101     }
102 
103     /// Set a custom origin.
104     ///
105     /// Override the `origin`, mainly useful when you are reaching a Server/LoadBalancer
106     /// which serves multiple services at the same time.
107     /// It will play the role of SNI (Server Name Indication).
108     ///
109     /// ```
110     /// # use tonic::transport::Endpoint;
111     /// # let mut builder = Endpoint::from_static("https://proxy.com");
112     /// builder.origin("https://example.com".parse().expect("http://example.com must be a valid URI"));
113     /// // origin: "https://example.com"
114     /// ```
origin(self, origin: Uri) -> Self115     pub fn origin(self, origin: Uri) -> Self {
116         Endpoint {
117             origin: Some(origin),
118             ..self
119         }
120     }
121 
122     /// Apply a timeout to each request.
123     ///
124     /// ```
125     /// # use tonic::transport::Endpoint;
126     /// # use std::time::Duration;
127     /// # let mut builder = Endpoint::from_static("https://example.com");
128     /// builder.timeout(Duration::from_secs(5));
129     /// ```
130     ///
131     /// # Notes
132     ///
133     /// This does **not** set the timeout metadata (`grpc-timeout` header) on
134     /// the request, meaning the server will not be informed of this timeout,
135     /// for that use [`Request::set_timeout`].
136     ///
137     /// [`Request::set_timeout`]: crate::Request::set_timeout
timeout(self, dur: Duration) -> Self138     pub fn timeout(self, dur: Duration) -> Self {
139         Endpoint {
140             timeout: Some(dur),
141             ..self
142         }
143     }
144 
145     /// Apply a timeout to connecting to the uri.
146     ///
147     /// Defaults to no timeout.
148     ///
149     /// ```
150     /// # use tonic::transport::Endpoint;
151     /// # use std::time::Duration;
152     /// # let mut builder = Endpoint::from_static("https://example.com");
153     /// builder.connect_timeout(Duration::from_secs(5));
154     /// ```
connect_timeout(self, dur: Duration) -> Self155     pub fn connect_timeout(self, dur: Duration) -> Self {
156         Endpoint {
157             connect_timeout: Some(dur),
158             ..self
159         }
160     }
161 
162     /// Set whether TCP keepalive messages are enabled on accepted connections.
163     ///
164     /// If `None` is specified, keepalive is disabled, otherwise the duration
165     /// specified will be the time to remain idle before sending TCP keepalive
166     /// probes.
167     ///
168     /// Default is no keepalive (`None`)
169     ///
tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self170     pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
171         Endpoint {
172             tcp_keepalive,
173             ..self
174         }
175     }
176 
177     /// Apply a concurrency limit to each request.
178     ///
179     /// ```
180     /// # use tonic::transport::Endpoint;
181     /// # let mut builder = Endpoint::from_static("https://example.com");
182     /// builder.concurrency_limit(256);
183     /// ```
concurrency_limit(self, limit: usize) -> Self184     pub fn concurrency_limit(self, limit: usize) -> Self {
185         Endpoint {
186             concurrency_limit: Some(limit),
187             ..self
188         }
189     }
190 
191     /// Apply a rate limit to each request.
192     ///
193     /// ```
194     /// # use tonic::transport::Endpoint;
195     /// # use std::time::Duration;
196     /// # let mut builder = Endpoint::from_static("https://example.com");
197     /// builder.rate_limit(32, Duration::from_secs(1));
198     /// ```
rate_limit(self, limit: u64, duration: Duration) -> Self199     pub fn rate_limit(self, limit: u64, duration: Duration) -> Self {
200         Endpoint {
201             rate_limit: Some((limit, duration)),
202             ..self
203         }
204     }
205 
206     /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
207     /// stream-level flow control.
208     ///
209     /// Default is 65,535
210     ///
211     /// [spec]: https://httpwg.org/specs/rfc9113.html#InitialWindowSize
initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self212     pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
213         Endpoint {
214             init_stream_window_size: sz.into(),
215             ..self
216         }
217     }
218 
219     /// Sets the max connection-level flow control for HTTP2
220     ///
221     /// Default is 65,535
initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self222     pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
223         Endpoint {
224             init_connection_window_size: sz.into(),
225             ..self
226         }
227     }
228 
229     /// Sets the tower service default internal buffer size
230     ///
231     /// Default is 1024
buffer_size(self, sz: impl Into<Option<usize>>) -> Self232     pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
233         Endpoint {
234             buffer_size: sz.into(),
235             ..self
236         }
237     }
238 
239     /// Configures TLS for the endpoint.
240     #[cfg(feature = "tls")]
241     #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
tls_config(self, tls_config: ClientTlsConfig) -> Result<Self, Error>242     pub fn tls_config(self, tls_config: ClientTlsConfig) -> Result<Self, Error> {
243         Ok(Endpoint {
244             tls: Some(
245                 tls_config
246                     .tls_connector(self.uri.clone())
247                     .map_err(Error::from_source)?,
248             ),
249             ..self
250         })
251     }
252 
253     /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
tcp_nodelay(self, enabled: bool) -> Self254     pub fn tcp_nodelay(self, enabled: bool) -> Self {
255         Endpoint {
256             tcp_nodelay: enabled,
257             ..self
258         }
259     }
260 
261     /// Set http2 KEEP_ALIVE_INTERVAL. Uses `hyper`'s default otherwise.
http2_keep_alive_interval(self, interval: Duration) -> Self262     pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
263         Endpoint {
264             http2_keep_alive_interval: Some(interval),
265             ..self
266         }
267     }
268 
269     /// Set http2 KEEP_ALIVE_TIMEOUT. Uses `hyper`'s default otherwise.
keep_alive_timeout(self, duration: Duration) -> Self270     pub fn keep_alive_timeout(self, duration: Duration) -> Self {
271         Endpoint {
272             http2_keep_alive_timeout: Some(duration),
273             ..self
274         }
275     }
276 
277     /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses `hyper`'s default otherwise.
keep_alive_while_idle(self, enabled: bool) -> Self278     pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
279         Endpoint {
280             http2_keep_alive_while_idle: Some(enabled),
281             ..self
282         }
283     }
284 
285     /// Sets whether to use an adaptive flow control. Uses `hyper`'s default otherwise.
http2_adaptive_window(self, enabled: bool) -> Self286     pub fn http2_adaptive_window(self, enabled: bool) -> Self {
287         Endpoint {
288             http2_adaptive_window: Some(enabled),
289             ..self
290         }
291     }
292 
293     /// Sets the executor used to spawn async tasks.
294     ///
295     /// Uses `tokio::spawn` by default.
executor<E>(mut self, executor: E) -> Self where E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,296     pub fn executor<E>(mut self, executor: E) -> Self
297     where
298         E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
299     {
300         self.executor = SharedExec::new(executor);
301         self
302     }
303 
connector<C>(&self, c: C) -> service::Connector<C>304     pub(crate) fn connector<C>(&self, c: C) -> service::Connector<C> {
305         #[cfg(feature = "tls")]
306         let connector = service::Connector::new(c, self.tls.clone());
307 
308         #[cfg(not(feature = "tls"))]
309         let connector = service::Connector::new(c);
310 
311         connector
312     }
313 
314     /// Create a channel from this config.
connect(&self) -> Result<Channel, Error>315     pub async fn connect(&self) -> Result<Channel, Error> {
316         let mut http = hyper::client::connect::HttpConnector::new();
317         http.enforce_http(false);
318         http.set_nodelay(self.tcp_nodelay);
319         http.set_keepalive(self.tcp_keepalive);
320 
321         let connector = self.connector(http);
322 
323         if let Some(connect_timeout) = self.connect_timeout {
324             let mut connector = hyper_timeout::TimeoutConnector::new(connector);
325             connector.set_connect_timeout(Some(connect_timeout));
326             Channel::connect(connector, self.clone()).await
327         } else {
328             Channel::connect(connector, self.clone()).await
329         }
330     }
331 
332     /// Create a channel from this config.
333     ///
334     /// The channel returned by this method does not attempt to connect to the endpoint until first
335     /// use.
connect_lazy(&self) -> Channel336     pub fn connect_lazy(&self) -> Channel {
337         let mut http = hyper::client::connect::HttpConnector::new();
338         http.enforce_http(false);
339         http.set_nodelay(self.tcp_nodelay);
340         http.set_keepalive(self.tcp_keepalive);
341 
342         let connector = self.connector(http);
343 
344         if let Some(connect_timeout) = self.connect_timeout {
345             let mut connector = hyper_timeout::TimeoutConnector::new(connector);
346             connector.set_connect_timeout(Some(connect_timeout));
347             Channel::new(connector, self.clone())
348         } else {
349             Channel::new(connector, self.clone())
350         }
351     }
352 
353     /// Connect with a custom connector.
354     ///
355     /// This allows you to build a [Channel](struct.Channel.html) that uses a non-HTTP transport.
356     /// See the `uds` example for an example on how to use this function to build channel that
357     /// uses a Unix socket transport.
358     ///
359     /// The [`connect_timeout`](Endpoint::connect_timeout) will still be applied.
connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error> where C: MakeConnection<Uri> + Send + 'static, C::Connection: Unpin + Send + 'static, C::Future: Send + 'static, crate::Error: From<C::Error> + Send + 'static,360     pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
361     where
362         C: MakeConnection<Uri> + Send + 'static,
363         C::Connection: Unpin + Send + 'static,
364         C::Future: Send + 'static,
365         crate::Error: From<C::Error> + Send + 'static,
366     {
367         let connector = self.connector(connector);
368 
369         if let Some(connect_timeout) = self.connect_timeout {
370             let mut connector = hyper_timeout::TimeoutConnector::new(connector);
371             connector.set_connect_timeout(Some(connect_timeout));
372             Channel::connect(connector, self.clone()).await
373         } else {
374             Channel::connect(connector, self.clone()).await
375         }
376     }
377 
378     /// Connect with a custom connector lazily.
379     ///
380     /// This allows you to build a [Channel](struct.Channel.html) that uses a non-HTTP transport
381     /// connect to it lazily.
382     ///
383     /// See the `uds` example for an example on how to use this function to build channel that
384     /// uses a Unix socket transport.
connect_with_connector_lazy<C>(&self, connector: C) -> Channel where C: MakeConnection<Uri> + Send + 'static, C::Connection: Unpin + Send + 'static, C::Future: Send + 'static, crate::Error: From<C::Error> + Send + 'static,385     pub fn connect_with_connector_lazy<C>(&self, connector: C) -> Channel
386     where
387         C: MakeConnection<Uri> + Send + 'static,
388         C::Connection: Unpin + Send + 'static,
389         C::Future: Send + 'static,
390         crate::Error: From<C::Error> + Send + 'static,
391     {
392         let connector = self.connector(connector);
393         if let Some(connect_timeout) = self.connect_timeout {
394             let mut connector = hyper_timeout::TimeoutConnector::new(connector);
395             connector.set_connect_timeout(Some(connect_timeout));
396             Channel::new(connector, self.clone())
397         } else {
398             Channel::new(connector, self.clone())
399         }
400     }
401 
402     /// Get the endpoint uri.
403     ///
404     /// ```
405     /// # use tonic::transport::Endpoint;
406     /// # use http::Uri;
407     /// let endpoint = Endpoint::from_static("https://example.com");
408     ///
409     /// assert_eq!(endpoint.uri(), &Uri::from_static("https://example.com"));
410     /// ```
uri(&self) -> &Uri411     pub fn uri(&self) -> &Uri {
412         &self.uri
413     }
414 }
415 
416 impl From<Uri> for Endpoint {
from(uri: Uri) -> Self417     fn from(uri: Uri) -> Self {
418         Self {
419             uri,
420             origin: None,
421             user_agent: None,
422             concurrency_limit: None,
423             rate_limit: None,
424             timeout: None,
425             #[cfg(feature = "tls")]
426             tls: None,
427             buffer_size: None,
428             init_stream_window_size: None,
429             init_connection_window_size: None,
430             tcp_keepalive: None,
431             tcp_nodelay: true,
432             http2_keep_alive_interval: None,
433             http2_keep_alive_timeout: None,
434             http2_keep_alive_while_idle: None,
435             connect_timeout: None,
436             http2_adaptive_window: None,
437             executor: SharedExec::tokio(),
438         }
439     }
440 }
441 
442 impl TryFrom<Bytes> for Endpoint {
443     type Error = Error;
444 
try_from(t: Bytes) -> Result<Self, Self::Error>445     fn try_from(t: Bytes) -> Result<Self, Self::Error> {
446         Self::from_shared(t)
447     }
448 }
449 
450 impl TryFrom<String> for Endpoint {
451     type Error = Error;
452 
try_from(t: String) -> Result<Self, Self::Error>453     fn try_from(t: String) -> Result<Self, Self::Error> {
454         Self::from_shared(t.into_bytes())
455     }
456 }
457 
458 impl TryFrom<&'static str> for Endpoint {
459     type Error = Error;
460 
try_from(t: &'static str) -> Result<Self, Self::Error>461     fn try_from(t: &'static str) -> Result<Self, Self::Error> {
462         Self::from_shared(t.as_bytes())
463     }
464 }
465 
466 impl fmt::Debug for Endpoint {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result467     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
468         f.debug_struct("Endpoint").finish()
469     }
470 }
471 
472 impl FromStr for Endpoint {
473     type Err = Error;
474 
from_str(s: &str) -> Result<Self, Self::Err>475     fn from_str(s: &str) -> Result<Self, Self::Err> {
476         Self::try_from(s.to_string())
477     }
478 }
479