1 use super::super::service; 2 use super::Channel; 3 #[cfg(feature = "tls")] 4 use super::ClientTlsConfig; 5 #[cfg(feature = "tls")] 6 use crate::transport::service::TlsConnector; 7 use crate::transport::{service::SharedExec, Error, Executor}; 8 use bytes::Bytes; 9 use http::{uri::Uri, HeaderValue}; 10 use std::{fmt, future::Future, pin::Pin, str::FromStr, time::Duration}; 11 use tower::make::MakeConnection; 12 // use crate::transport::E 13 14 /// Channel builder. 15 /// 16 /// This struct is used to build and configure HTTP/2 channels. 17 #[derive(Clone)] 18 pub struct Endpoint { 19 pub(crate) uri: Uri, 20 pub(crate) origin: Option<Uri>, 21 pub(crate) user_agent: Option<HeaderValue>, 22 pub(crate) timeout: Option<Duration>, 23 pub(crate) concurrency_limit: Option<usize>, 24 pub(crate) rate_limit: Option<(u64, Duration)>, 25 #[cfg(feature = "tls")] 26 pub(crate) tls: Option<TlsConnector>, 27 pub(crate) buffer_size: Option<usize>, 28 pub(crate) init_stream_window_size: Option<u32>, 29 pub(crate) init_connection_window_size: Option<u32>, 30 pub(crate) tcp_keepalive: Option<Duration>, 31 pub(crate) tcp_nodelay: bool, 32 pub(crate) http2_keep_alive_interval: Option<Duration>, 33 pub(crate) http2_keep_alive_timeout: Option<Duration>, 34 pub(crate) http2_keep_alive_while_idle: Option<bool>, 35 pub(crate) connect_timeout: Option<Duration>, 36 pub(crate) http2_adaptive_window: Option<bool>, 37 pub(crate) executor: SharedExec, 38 } 39 40 impl Endpoint { 41 // FIXME: determine if we want to expose this or not. This is really 42 // just used in codegen for a shortcut. 43 #[doc(hidden)] new<D>(dst: D) -> Result<Self, Error> where D: TryInto<Self>, D::Error: Into<crate::Error>,44 pub fn new<D>(dst: D) -> Result<Self, Error> 45 where 46 D: TryInto<Self>, 47 D::Error: Into<crate::Error>, 48 { 49 let me = dst.try_into().map_err(|e| Error::from_source(e.into()))?; 50 Ok(me) 51 } 52 53 /// Convert an `Endpoint` from a static string. 54 /// 55 /// # Panics 56 /// 57 /// This function panics if the argument is an invalid URI. 58 /// 59 /// ``` 60 /// # use tonic::transport::Endpoint; 61 /// Endpoint::from_static("https://example.com"); 62 /// ``` from_static(s: &'static str) -> Self63 pub fn from_static(s: &'static str) -> Self { 64 let uri = Uri::from_static(s); 65 Self::from(uri) 66 } 67 68 /// Convert an `Endpoint` from shared bytes. 69 /// 70 /// ``` 71 /// # use tonic::transport::Endpoint; 72 /// Endpoint::from_shared("https://example.com".to_string()); 73 /// ``` from_shared(s: impl Into<Bytes>) -> Result<Self, Error>74 pub fn from_shared(s: impl Into<Bytes>) -> Result<Self, Error> { 75 let uri = Uri::from_maybe_shared(s.into()).map_err(|e| Error::new_invalid_uri().with(e))?; 76 Ok(Self::from(uri)) 77 } 78 79 /// Set a custom user-agent header. 80 /// 81 /// `user_agent` will be prepended to Tonic's default user-agent string (`tonic/x.x.x`). 82 /// It must be a value that can be converted into a valid `http::HeaderValue` or building 83 /// the endpoint will fail. 84 /// ``` 85 /// # use tonic::transport::Endpoint; 86 /// # let mut builder = Endpoint::from_static("https://example.com"); 87 /// builder.user_agent("Greeter").expect("Greeter should be a valid header value"); 88 /// // user-agent: "Greeter tonic/x.x.x" 89 /// ``` user_agent<T>(self, user_agent: T) -> Result<Self, Error> where T: TryInto<HeaderValue>,90 pub fn user_agent<T>(self, user_agent: T) -> Result<Self, Error> 91 where 92 T: TryInto<HeaderValue>, 93 { 94 user_agent 95 .try_into() 96 .map(|ua| Endpoint { 97 user_agent: Some(ua), 98 ..self 99 }) 100 .map_err(|_| Error::new_invalid_user_agent()) 101 } 102 103 /// Set a custom origin. 104 /// 105 /// Override the `origin`, mainly useful when you are reaching a Server/LoadBalancer 106 /// which serves multiple services at the same time. 107 /// It will play the role of SNI (Server Name Indication). 108 /// 109 /// ``` 110 /// # use tonic::transport::Endpoint; 111 /// # let mut builder = Endpoint::from_static("https://proxy.com"); 112 /// builder.origin("https://example.com".parse().expect("http://example.com must be a valid URI")); 113 /// // origin: "https://example.com" 114 /// ``` origin(self, origin: Uri) -> Self115 pub fn origin(self, origin: Uri) -> Self { 116 Endpoint { 117 origin: Some(origin), 118 ..self 119 } 120 } 121 122 /// Apply a timeout to each request. 123 /// 124 /// ``` 125 /// # use tonic::transport::Endpoint; 126 /// # use std::time::Duration; 127 /// # let mut builder = Endpoint::from_static("https://example.com"); 128 /// builder.timeout(Duration::from_secs(5)); 129 /// ``` 130 /// 131 /// # Notes 132 /// 133 /// This does **not** set the timeout metadata (`grpc-timeout` header) on 134 /// the request, meaning the server will not be informed of this timeout, 135 /// for that use [`Request::set_timeout`]. 136 /// 137 /// [`Request::set_timeout`]: crate::Request::set_timeout timeout(self, dur: Duration) -> Self138 pub fn timeout(self, dur: Duration) -> Self { 139 Endpoint { 140 timeout: Some(dur), 141 ..self 142 } 143 } 144 145 /// Apply a timeout to connecting to the uri. 146 /// 147 /// Defaults to no timeout. 148 /// 149 /// ``` 150 /// # use tonic::transport::Endpoint; 151 /// # use std::time::Duration; 152 /// # let mut builder = Endpoint::from_static("https://example.com"); 153 /// builder.connect_timeout(Duration::from_secs(5)); 154 /// ``` connect_timeout(self, dur: Duration) -> Self155 pub fn connect_timeout(self, dur: Duration) -> Self { 156 Endpoint { 157 connect_timeout: Some(dur), 158 ..self 159 } 160 } 161 162 /// Set whether TCP keepalive messages are enabled on accepted connections. 163 /// 164 /// If `None` is specified, keepalive is disabled, otherwise the duration 165 /// specified will be the time to remain idle before sending TCP keepalive 166 /// probes. 167 /// 168 /// Default is no keepalive (`None`) 169 /// tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self170 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self { 171 Endpoint { 172 tcp_keepalive, 173 ..self 174 } 175 } 176 177 /// Apply a concurrency limit to each request. 178 /// 179 /// ``` 180 /// # use tonic::transport::Endpoint; 181 /// # let mut builder = Endpoint::from_static("https://example.com"); 182 /// builder.concurrency_limit(256); 183 /// ``` concurrency_limit(self, limit: usize) -> Self184 pub fn concurrency_limit(self, limit: usize) -> Self { 185 Endpoint { 186 concurrency_limit: Some(limit), 187 ..self 188 } 189 } 190 191 /// Apply a rate limit to each request. 192 /// 193 /// ``` 194 /// # use tonic::transport::Endpoint; 195 /// # use std::time::Duration; 196 /// # let mut builder = Endpoint::from_static("https://example.com"); 197 /// builder.rate_limit(32, Duration::from_secs(1)); 198 /// ``` rate_limit(self, limit: u64, duration: Duration) -> Self199 pub fn rate_limit(self, limit: u64, duration: Duration) -> Self { 200 Endpoint { 201 rate_limit: Some((limit, duration)), 202 ..self 203 } 204 } 205 206 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 207 /// stream-level flow control. 208 /// 209 /// Default is 65,535 210 /// 211 /// [spec]: https://httpwg.org/specs/rfc9113.html#InitialWindowSize initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self212 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self { 213 Endpoint { 214 init_stream_window_size: sz.into(), 215 ..self 216 } 217 } 218 219 /// Sets the max connection-level flow control for HTTP2 220 /// 221 /// Default is 65,535 initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self222 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self { 223 Endpoint { 224 init_connection_window_size: sz.into(), 225 ..self 226 } 227 } 228 229 /// Sets the tower service default internal buffer size 230 /// 231 /// Default is 1024 buffer_size(self, sz: impl Into<Option<usize>>) -> Self232 pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self { 233 Endpoint { 234 buffer_size: sz.into(), 235 ..self 236 } 237 } 238 239 /// Configures TLS for the endpoint. 240 #[cfg(feature = "tls")] 241 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] tls_config(self, tls_config: ClientTlsConfig) -> Result<Self, Error>242 pub fn tls_config(self, tls_config: ClientTlsConfig) -> Result<Self, Error> { 243 Ok(Endpoint { 244 tls: Some( 245 tls_config 246 .tls_connector(self.uri.clone()) 247 .map_err(Error::from_source)?, 248 ), 249 ..self 250 }) 251 } 252 253 /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default. tcp_nodelay(self, enabled: bool) -> Self254 pub fn tcp_nodelay(self, enabled: bool) -> Self { 255 Endpoint { 256 tcp_nodelay: enabled, 257 ..self 258 } 259 } 260 261 /// Set http2 KEEP_ALIVE_INTERVAL. Uses `hyper`'s default otherwise. http2_keep_alive_interval(self, interval: Duration) -> Self262 pub fn http2_keep_alive_interval(self, interval: Duration) -> Self { 263 Endpoint { 264 http2_keep_alive_interval: Some(interval), 265 ..self 266 } 267 } 268 269 /// Set http2 KEEP_ALIVE_TIMEOUT. Uses `hyper`'s default otherwise. keep_alive_timeout(self, duration: Duration) -> Self270 pub fn keep_alive_timeout(self, duration: Duration) -> Self { 271 Endpoint { 272 http2_keep_alive_timeout: Some(duration), 273 ..self 274 } 275 } 276 277 /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses `hyper`'s default otherwise. keep_alive_while_idle(self, enabled: bool) -> Self278 pub fn keep_alive_while_idle(self, enabled: bool) -> Self { 279 Endpoint { 280 http2_keep_alive_while_idle: Some(enabled), 281 ..self 282 } 283 } 284 285 /// Sets whether to use an adaptive flow control. Uses `hyper`'s default otherwise. http2_adaptive_window(self, enabled: bool) -> Self286 pub fn http2_adaptive_window(self, enabled: bool) -> Self { 287 Endpoint { 288 http2_adaptive_window: Some(enabled), 289 ..self 290 } 291 } 292 293 /// Sets the executor used to spawn async tasks. 294 /// 295 /// Uses `tokio::spawn` by default. executor<E>(mut self, executor: E) -> Self where E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,296 pub fn executor<E>(mut self, executor: E) -> Self 297 where 298 E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static, 299 { 300 self.executor = SharedExec::new(executor); 301 self 302 } 303 connector<C>(&self, c: C) -> service::Connector<C>304 pub(crate) fn connector<C>(&self, c: C) -> service::Connector<C> { 305 #[cfg(feature = "tls")] 306 let connector = service::Connector::new(c, self.tls.clone()); 307 308 #[cfg(not(feature = "tls"))] 309 let connector = service::Connector::new(c); 310 311 connector 312 } 313 314 /// Create a channel from this config. connect(&self) -> Result<Channel, Error>315 pub async fn connect(&self) -> Result<Channel, Error> { 316 let mut http = hyper::client::connect::HttpConnector::new(); 317 http.enforce_http(false); 318 http.set_nodelay(self.tcp_nodelay); 319 http.set_keepalive(self.tcp_keepalive); 320 321 let connector = self.connector(http); 322 323 if let Some(connect_timeout) = self.connect_timeout { 324 let mut connector = hyper_timeout::TimeoutConnector::new(connector); 325 connector.set_connect_timeout(Some(connect_timeout)); 326 Channel::connect(connector, self.clone()).await 327 } else { 328 Channel::connect(connector, self.clone()).await 329 } 330 } 331 332 /// Create a channel from this config. 333 /// 334 /// The channel returned by this method does not attempt to connect to the endpoint until first 335 /// use. connect_lazy(&self) -> Channel336 pub fn connect_lazy(&self) -> Channel { 337 let mut http = hyper::client::connect::HttpConnector::new(); 338 http.enforce_http(false); 339 http.set_nodelay(self.tcp_nodelay); 340 http.set_keepalive(self.tcp_keepalive); 341 342 let connector = self.connector(http); 343 344 if let Some(connect_timeout) = self.connect_timeout { 345 let mut connector = hyper_timeout::TimeoutConnector::new(connector); 346 connector.set_connect_timeout(Some(connect_timeout)); 347 Channel::new(connector, self.clone()) 348 } else { 349 Channel::new(connector, self.clone()) 350 } 351 } 352 353 /// Connect with a custom connector. 354 /// 355 /// This allows you to build a [Channel](struct.Channel.html) that uses a non-HTTP transport. 356 /// See the `uds` example for an example on how to use this function to build channel that 357 /// uses a Unix socket transport. 358 /// 359 /// The [`connect_timeout`](Endpoint::connect_timeout) will still be applied. connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error> where C: MakeConnection<Uri> + Send + 'static, C::Connection: Unpin + Send + 'static, C::Future: Send + 'static, crate::Error: From<C::Error> + Send + 'static,360 pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error> 361 where 362 C: MakeConnection<Uri> + Send + 'static, 363 C::Connection: Unpin + Send + 'static, 364 C::Future: Send + 'static, 365 crate::Error: From<C::Error> + Send + 'static, 366 { 367 let connector = self.connector(connector); 368 369 if let Some(connect_timeout) = self.connect_timeout { 370 let mut connector = hyper_timeout::TimeoutConnector::new(connector); 371 connector.set_connect_timeout(Some(connect_timeout)); 372 Channel::connect(connector, self.clone()).await 373 } else { 374 Channel::connect(connector, self.clone()).await 375 } 376 } 377 378 /// Connect with a custom connector lazily. 379 /// 380 /// This allows you to build a [Channel](struct.Channel.html) that uses a non-HTTP transport 381 /// connect to it lazily. 382 /// 383 /// See the `uds` example for an example on how to use this function to build channel that 384 /// uses a Unix socket transport. connect_with_connector_lazy<C>(&self, connector: C) -> Channel where C: MakeConnection<Uri> + Send + 'static, C::Connection: Unpin + Send + 'static, C::Future: Send + 'static, crate::Error: From<C::Error> + Send + 'static,385 pub fn connect_with_connector_lazy<C>(&self, connector: C) -> Channel 386 where 387 C: MakeConnection<Uri> + Send + 'static, 388 C::Connection: Unpin + Send + 'static, 389 C::Future: Send + 'static, 390 crate::Error: From<C::Error> + Send + 'static, 391 { 392 let connector = self.connector(connector); 393 if let Some(connect_timeout) = self.connect_timeout { 394 let mut connector = hyper_timeout::TimeoutConnector::new(connector); 395 connector.set_connect_timeout(Some(connect_timeout)); 396 Channel::new(connector, self.clone()) 397 } else { 398 Channel::new(connector, self.clone()) 399 } 400 } 401 402 /// Get the endpoint uri. 403 /// 404 /// ``` 405 /// # use tonic::transport::Endpoint; 406 /// # use http::Uri; 407 /// let endpoint = Endpoint::from_static("https://example.com"); 408 /// 409 /// assert_eq!(endpoint.uri(), &Uri::from_static("https://example.com")); 410 /// ``` uri(&self) -> &Uri411 pub fn uri(&self) -> &Uri { 412 &self.uri 413 } 414 } 415 416 impl From<Uri> for Endpoint { from(uri: Uri) -> Self417 fn from(uri: Uri) -> Self { 418 Self { 419 uri, 420 origin: None, 421 user_agent: None, 422 concurrency_limit: None, 423 rate_limit: None, 424 timeout: None, 425 #[cfg(feature = "tls")] 426 tls: None, 427 buffer_size: None, 428 init_stream_window_size: None, 429 init_connection_window_size: None, 430 tcp_keepalive: None, 431 tcp_nodelay: true, 432 http2_keep_alive_interval: None, 433 http2_keep_alive_timeout: None, 434 http2_keep_alive_while_idle: None, 435 connect_timeout: None, 436 http2_adaptive_window: None, 437 executor: SharedExec::tokio(), 438 } 439 } 440 } 441 442 impl TryFrom<Bytes> for Endpoint { 443 type Error = Error; 444 try_from(t: Bytes) -> Result<Self, Self::Error>445 fn try_from(t: Bytes) -> Result<Self, Self::Error> { 446 Self::from_shared(t) 447 } 448 } 449 450 impl TryFrom<String> for Endpoint { 451 type Error = Error; 452 try_from(t: String) -> Result<Self, Self::Error>453 fn try_from(t: String) -> Result<Self, Self::Error> { 454 Self::from_shared(t.into_bytes()) 455 } 456 } 457 458 impl TryFrom<&'static str> for Endpoint { 459 type Error = Error; 460 try_from(t: &'static str) -> Result<Self, Self::Error>461 fn try_from(t: &'static str) -> Result<Self, Self::Error> { 462 Self::from_shared(t.as_bytes()) 463 } 464 } 465 466 impl fmt::Debug for Endpoint { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result467 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 468 f.debug_struct("Endpoint").finish() 469 } 470 } 471 472 impl FromStr for Endpoint { 473 type Err = Error; 474 from_str(s: &str) -> Result<Self, Self::Err>475 fn from_str(s: &str) -> Result<Self, Self::Err> { 476 Self::try_from(s.to_string()) 477 } 478 } 479