//! Client implementation and builder. mod endpoint; #[cfg(feature = "tls")] #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] mod tls; pub use endpoint::Endpoint; #[cfg(feature = "tls")] pub use tls::ClientTlsConfig; use super::service::{Connection, DynamicServiceStream, SharedExec}; use crate::body::BoxBody; use crate::transport::Executor; use bytes::Bytes; use http::{ uri::{InvalidUri, Uri}, Request, Response, }; use hyper::client::connect::Connection as HyperConnection; use std::{ fmt, future::Future, hash::Hash, pin::Pin, task::{ready, Context, Poll}, }; use tokio::{ io::{AsyncRead, AsyncWrite}, sync::mpsc::{channel, Sender}, }; use tower::balance::p2c::Balance; use tower::{ buffer::{self, Buffer}, discover::{Change, Discover}, util::{BoxService, Either}, Service, }; type Svc = Either, Response, crate::Error>>; const DEFAULT_BUFFER_SIZE: usize = 1024; /// A default batteries included `transport` channel. /// /// This provides a fully featured http2 gRPC client based on [`hyper::Client`] /// and `tower` services. /// /// # Multiplexing requests /// /// Sending a request on a channel requires a `&mut self` and thus can only send /// one request in flight. This is intentional and is required to follow the `Service` /// contract from the `tower` library which this channel implementation is built on /// top of. /// /// `tower` itself has a concept of `poll_ready` which is the main mechanism to apply /// back pressure. `poll_ready` takes a `&mut self` and when it returns `Poll::Ready` /// we know the `Service` is able to accept only one request before we must `poll_ready` /// again. Due to this fact any `async fn` that wants to poll for readiness and submit /// the request must have a `&mut self` reference. /// /// To work around this and to ease the use of the channel, `Channel` provides a /// `Clone` implementation that is _cheap_. This is because at the very top level /// the channel is backed by a `tower_buffer::Buffer` which runs the connection /// in a background task and provides a `mpsc` channel interface. Due to this /// cloning the `Channel` type is cheap and encouraged. #[derive(Clone)] pub struct Channel { svc: Buffer>, } /// A future that resolves to an HTTP response. /// /// This is returned by the `Service::call` on [`Channel`]. pub struct ResponseFuture { inner: buffer::future::ResponseFuture<>>::Future>, } impl Channel { /// Create an [`Endpoint`] builder that can create [`Channel`]s. pub fn builder(uri: Uri) -> Endpoint { Endpoint::from(uri) } /// Create an [`Endpoint`] from a static string. /// /// ``` /// # use tonic::transport::Channel; /// Channel::from_static("https://example.com"); /// ``` pub fn from_static(s: &'static str) -> Endpoint { let uri = Uri::from_static(s); Self::builder(uri) } /// Create an [`Endpoint`] from shared bytes. /// /// ``` /// # use tonic::transport::Channel; /// Channel::from_shared("https://example.com"); /// ``` pub fn from_shared(s: impl Into) -> Result { let uri = Uri::from_maybe_shared(s.into())?; Ok(Self::builder(uri)) } /// Balance a list of [`Endpoint`]'s. /// /// This creates a [`Channel`] that will load balance across all the /// provided endpoints. pub fn balance_list(list: impl Iterator) -> Self { let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); list.for_each(|endpoint| { tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint)) .unwrap(); }); channel } /// Balance a list of [`Endpoint`]'s. /// /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. pub fn balance_channel(capacity: usize) -> (Self, Sender>) where K: Hash + Eq + Send + Clone + 'static, { Self::balance_channel_with_executor(capacity, SharedExec::tokio()) } /// Balance a list of [`Endpoint`]'s. /// /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. /// /// The [`Channel`] will use the given executor to spawn async tasks. pub fn balance_channel_with_executor( capacity: usize, executor: E, ) -> (Self, Sender>) where K: Hash + Eq + Send + Clone + 'static, E: Executor + Send>>> + Send + Sync + 'static, { let (tx, rx) = channel(capacity); let list = DynamicServiceStream::new(rx); (Self::balance(list, DEFAULT_BUFFER_SIZE, executor), tx) } pub(crate) fn new(connector: C, endpoint: Endpoint) -> Self where C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, { let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE); let executor = endpoint.executor.clone(); let svc = Connection::lazy(connector, endpoint); let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size); executor.execute(Box::pin(worker)); Channel { svc } } pub(crate) async fn connect(connector: C, endpoint: Endpoint) -> Result where C: Service + Send + 'static, C::Error: Into + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, { let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE); let executor = endpoint.executor.clone(); let svc = Connection::connect(connector, endpoint) .await .map_err(super::Error::from_source)?; let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size); executor.execute(Box::pin(worker)); Ok(Channel { svc }) } pub(crate) fn balance(discover: D, buffer_size: usize, executor: E) -> Self where D: Discover + Unpin + Send + 'static, D::Error: Into, D::Key: Hash + Send + Clone, E: Executor> + Send + Sync + 'static, { let svc = Balance::new(discover); let svc = BoxService::new(svc); let (svc, worker) = Buffer::pair(Either::B(svc), buffer_size); executor.execute(Box::pin(worker)); Channel { svc } } } impl Service> for Channel { type Response = http::Response; type Error = super::Error; type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source) } fn call(&mut self, request: http::Request) -> Self::Future { let inner = Service::call(&mut self.svc, request); ResponseFuture { inner } } } impl Future for ResponseFuture { type Output = Result, super::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let val = ready!(Pin::new(&mut self.inner).poll(cx)).map_err(super::Error::from_source)?; Ok(val).into() } } impl fmt::Debug for Channel { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Channel").finish() } } impl fmt::Debug for ResponseFuture { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ResponseFuture").finish() } }