1 //! Client implementation and builder. 2 3 mod endpoint; 4 #[cfg(feature = "tls")] 5 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))] 6 mod tls; 7 8 pub use endpoint::Endpoint; 9 #[cfg(feature = "tls")] 10 pub use tls::ClientTlsConfig; 11 12 use super::service::{Connection, DynamicServiceStream, SharedExec}; 13 use crate::body::BoxBody; 14 use crate::transport::Executor; 15 use bytes::Bytes; 16 use http::{ 17 uri::{InvalidUri, Uri}, 18 Request, Response, 19 }; 20 use hyper::client::connect::Connection as HyperConnection; 21 use std::{ 22 fmt, 23 future::Future, 24 hash::Hash, 25 pin::Pin, 26 task::{ready, Context, Poll}, 27 }; 28 use tokio::{ 29 io::{AsyncRead, AsyncWrite}, 30 sync::mpsc::{channel, Sender}, 31 }; 32 33 use tower::balance::p2c::Balance; 34 use tower::{ 35 buffer::{self, Buffer}, 36 discover::{Change, Discover}, 37 util::{BoxService, Either}, 38 Service, 39 }; 40 41 type Svc = Either<Connection, BoxService<Request<BoxBody>, Response<hyper::Body>, crate::Error>>; 42 43 const DEFAULT_BUFFER_SIZE: usize = 1024; 44 45 /// A default batteries included `transport` channel. 46 /// 47 /// This provides a fully featured http2 gRPC client based on [`hyper::Client`] 48 /// and `tower` services. 49 /// 50 /// # Multiplexing requests 51 /// 52 /// Sending a request on a channel requires a `&mut self` and thus can only send 53 /// one request in flight. This is intentional and is required to follow the `Service` 54 /// contract from the `tower` library which this channel implementation is built on 55 /// top of. 56 /// 57 /// `tower` itself has a concept of `poll_ready` which is the main mechanism to apply 58 /// back pressure. `poll_ready` takes a `&mut self` and when it returns `Poll::Ready` 59 /// we know the `Service` is able to accept only one request before we must `poll_ready` 60 /// again. Due to this fact any `async fn` that wants to poll for readiness and submit 61 /// the request must have a `&mut self` reference. 62 /// 63 /// To work around this and to ease the use of the channel, `Channel` provides a 64 /// `Clone` implementation that is _cheap_. This is because at the very top level 65 /// the channel is backed by a `tower_buffer::Buffer` which runs the connection 66 /// in a background task and provides a `mpsc` channel interface. Due to this 67 /// cloning the `Channel` type is cheap and encouraged. 68 #[derive(Clone)] 69 pub struct Channel { 70 svc: Buffer<Svc, Request<BoxBody>>, 71 } 72 73 /// A future that resolves to an HTTP response. 74 /// 75 /// This is returned by the `Service::call` on [`Channel`]. 76 pub struct ResponseFuture { 77 inner: buffer::future::ResponseFuture<<Svc as Service<Request<BoxBody>>>::Future>, 78 } 79 80 impl Channel { 81 /// Create an [`Endpoint`] builder that can create [`Channel`]s. 82 pub fn builder(uri: Uri) -> Endpoint { 83 Endpoint::from(uri) 84 } 85 86 /// Create an [`Endpoint`] from a static string. 87 /// 88 /// ``` 89 /// # use tonic::transport::Channel; 90 /// Channel::from_static("https://example.com"); 91 /// ``` 92 pub fn from_static(s: &'static str) -> Endpoint { 93 let uri = Uri::from_static(s); 94 Self::builder(uri) 95 } 96 97 /// Create an [`Endpoint`] from shared bytes. 98 /// 99 /// ``` 100 /// # use tonic::transport::Channel; 101 /// Channel::from_shared("https://example.com"); 102 /// ``` 103 pub fn from_shared(s: impl Into<Bytes>) -> Result<Endpoint, InvalidUri> { 104 let uri = Uri::from_maybe_shared(s.into())?; 105 Ok(Self::builder(uri)) 106 } 107 108 /// Balance a list of [`Endpoint`]'s. 109 /// 110 /// This creates a [`Channel`] that will load balance across all the 111 /// provided endpoints. 112 pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self { 113 let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE); 114 list.for_each(|endpoint| { 115 tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint)) 116 .unwrap(); 117 }); 118 119 channel 120 } 121 122 /// Balance a list of [`Endpoint`]'s. 123 /// 124 /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. 125 pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>) 126 where 127 K: Hash + Eq + Send + Clone + 'static, 128 { 129 Self::balance_channel_with_executor(capacity, SharedExec::tokio()) 130 } 131 132 /// Balance a list of [`Endpoint`]'s. 133 /// 134 /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints. 135 /// 136 /// The [`Channel`] will use the given executor to spawn async tasks. balance_channel_with_executor<K, E>( capacity: usize, executor: E, ) -> (Self, Sender<Change<K, Endpoint>>) where K: Hash + Eq + Send + Clone + 'static, E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,137 pub fn balance_channel_with_executor<K, E>( 138 capacity: usize, 139 executor: E, 140 ) -> (Self, Sender<Change<K, Endpoint>>) 141 where 142 K: Hash + Eq + Send + Clone + 'static, 143 E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static, 144 { 145 let (tx, rx) = channel(capacity); 146 let list = DynamicServiceStream::new(rx); 147 (Self::balance(list, DEFAULT_BUFFER_SIZE, executor), tx) 148 } 149 new<C>(connector: C, endpoint: Endpoint) -> Self where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,150 pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self 151 where 152 C: Service<Uri> + Send + 'static, 153 C::Error: Into<crate::Error> + Send, 154 C::Future: Unpin + Send, 155 C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, 156 { 157 let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE); 158 let executor = endpoint.executor.clone(); 159 160 let svc = Connection::lazy(connector, endpoint); 161 let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size); 162 executor.execute(Box::pin(worker)); 163 164 Channel { svc } 165 } 166 connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error> where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,167 pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error> 168 where 169 C: Service<Uri> + Send + 'static, 170 C::Error: Into<crate::Error> + Send, 171 C::Future: Unpin + Send, 172 C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static, 173 { 174 let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE); 175 let executor = endpoint.executor.clone(); 176 177 let svc = Connection::connect(connector, endpoint) 178 .await 179 .map_err(super::Error::from_source)?; 180 let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size); 181 executor.execute(Box::pin(worker)); 182 183 Ok(Channel { svc }) 184 } 185 balance<D, E>(discover: D, buffer_size: usize, executor: E) -> Self where D: Discover<Service = Connection> + Unpin + Send + 'static, D::Error: Into<crate::Error>, D::Key: Hash + Send + Clone, E: Executor<crate::transport::BoxFuture<'static, ()>> + Send + Sync + 'static,186 pub(crate) fn balance<D, E>(discover: D, buffer_size: usize, executor: E) -> Self 187 where 188 D: Discover<Service = Connection> + Unpin + Send + 'static, 189 D::Error: Into<crate::Error>, 190 D::Key: Hash + Send + Clone, 191 E: Executor<crate::transport::BoxFuture<'static, ()>> + Send + Sync + 'static, 192 { 193 let svc = Balance::new(discover); 194 195 let svc = BoxService::new(svc); 196 let (svc, worker) = Buffer::pair(Either::B(svc), buffer_size); 197 executor.execute(Box::pin(worker)); 198 199 Channel { svc } 200 } 201 } 202 203 impl Service<http::Request<BoxBody>> for Channel { 204 type Response = http::Response<super::Body>; 205 type Error = super::Error; 206 type Future = ResponseFuture; 207 poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>208 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { 209 Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source) 210 } 211 call(&mut self, request: http::Request<BoxBody>) -> Self::Future212 fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future { 213 let inner = Service::call(&mut self.svc, request); 214 215 ResponseFuture { inner } 216 } 217 } 218 219 impl Future for ResponseFuture { 220 type Output = Result<Response<hyper::Body>, super::Error>; 221 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>222 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 223 let val = ready!(Pin::new(&mut self.inner).poll(cx)).map_err(super::Error::from_source)?; 224 Ok(val).into() 225 } 226 } 227 228 impl fmt::Debug for Channel { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result229 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 230 f.debug_struct("Channel").finish() 231 } 232 } 233 234 impl fmt::Debug for ResponseFuture { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result235 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 236 f.debug_struct("ResponseFuture").finish() 237 } 238 } 239