• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Client implementation and builder.
2 
3 mod endpoint;
4 #[cfg(feature = "tls")]
5 #[cfg_attr(docsrs, doc(cfg(feature = "tls")))]
6 mod tls;
7 
8 pub use endpoint::Endpoint;
9 #[cfg(feature = "tls")]
10 pub use tls::ClientTlsConfig;
11 
12 use super::service::{Connection, DynamicServiceStream, SharedExec};
13 use crate::body::BoxBody;
14 use crate::transport::Executor;
15 use bytes::Bytes;
16 use http::{
17     uri::{InvalidUri, Uri},
18     Request, Response,
19 };
20 use hyper::client::connect::Connection as HyperConnection;
21 use std::{
22     fmt,
23     future::Future,
24     hash::Hash,
25     pin::Pin,
26     task::{ready, Context, Poll},
27 };
28 use tokio::{
29     io::{AsyncRead, AsyncWrite},
30     sync::mpsc::{channel, Sender},
31 };
32 
33 use tower::balance::p2c::Balance;
34 use tower::{
35     buffer::{self, Buffer},
36     discover::{Change, Discover},
37     util::{BoxService, Either},
38     Service,
39 };
40 
41 type Svc = Either<Connection, BoxService<Request<BoxBody>, Response<hyper::Body>, crate::Error>>;
42 
43 const DEFAULT_BUFFER_SIZE: usize = 1024;
44 
45 /// A default batteries included `transport` channel.
46 ///
47 /// This provides a fully featured http2 gRPC client based on [`hyper::Client`]
48 /// and `tower` services.
49 ///
50 /// # Multiplexing requests
51 ///
52 /// Sending a request on a channel requires a `&mut self` and thus can only send
53 /// one request in flight. This is intentional and is required to follow the `Service`
54 /// contract from the `tower` library which this channel implementation is built on
55 /// top of.
56 ///
57 /// `tower` itself has a concept of `poll_ready` which is the main mechanism to apply
58 /// back pressure. `poll_ready` takes a `&mut self` and when it returns `Poll::Ready`
59 /// we know the `Service` is able to accept only one request before we must `poll_ready`
60 /// again. Due to this fact any `async fn` that wants to poll for readiness and submit
61 /// the request must have a `&mut self` reference.
62 ///
63 /// To work around this and to ease the use of the channel, `Channel` provides a
64 /// `Clone` implementation that is _cheap_. This is because at the very top level
65 /// the channel is backed by a `tower_buffer::Buffer` which runs the connection
66 /// in a background task and provides a `mpsc` channel interface. Due to this
67 /// cloning the `Channel` type is cheap and encouraged.
68 #[derive(Clone)]
69 pub struct Channel {
70     svc: Buffer<Svc, Request<BoxBody>>,
71 }
72 
73 /// A future that resolves to an HTTP response.
74 ///
75 /// This is returned by the `Service::call` on [`Channel`].
76 pub struct ResponseFuture {
77     inner: buffer::future::ResponseFuture<<Svc as Service<Request<BoxBody>>>::Future>,
78 }
79 
80 impl Channel {
81     /// Create an [`Endpoint`] builder that can create [`Channel`]s.
82     pub fn builder(uri: Uri) -> Endpoint {
83         Endpoint::from(uri)
84     }
85 
86     /// Create an [`Endpoint`] from a static string.
87     ///
88     /// ```
89     /// # use tonic::transport::Channel;
90     /// Channel::from_static("https://example.com");
91     /// ```
92     pub fn from_static(s: &'static str) -> Endpoint {
93         let uri = Uri::from_static(s);
94         Self::builder(uri)
95     }
96 
97     /// Create an [`Endpoint`] from shared bytes.
98     ///
99     /// ```
100     /// # use tonic::transport::Channel;
101     /// Channel::from_shared("https://example.com");
102     /// ```
103     pub fn from_shared(s: impl Into<Bytes>) -> Result<Endpoint, InvalidUri> {
104         let uri = Uri::from_maybe_shared(s.into())?;
105         Ok(Self::builder(uri))
106     }
107 
108     /// Balance a list of [`Endpoint`]'s.
109     ///
110     /// This creates a [`Channel`] that will load balance across all the
111     /// provided endpoints.
112     pub fn balance_list(list: impl Iterator<Item = Endpoint>) -> Self {
113         let (channel, tx) = Self::balance_channel(DEFAULT_BUFFER_SIZE);
114         list.for_each(|endpoint| {
115             tx.try_send(Change::Insert(endpoint.uri.clone(), endpoint))
116                 .unwrap();
117         });
118 
119         channel
120     }
121 
122     /// Balance a list of [`Endpoint`]'s.
123     ///
124     /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
125     pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
126     where
127         K: Hash + Eq + Send + Clone + 'static,
128     {
129         Self::balance_channel_with_executor(capacity, SharedExec::tokio())
130     }
131 
132     /// Balance a list of [`Endpoint`]'s.
133     ///
134     /// This creates a [`Channel`] that will listen to a stream of change events and will add or remove provided endpoints.
135     ///
136     /// The [`Channel`] will use the given executor to spawn async tasks.
balance_channel_with_executor<K, E>( capacity: usize, executor: E, ) -> (Self, Sender<Change<K, Endpoint>>) where K: Hash + Eq + Send + Clone + 'static, E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,137     pub fn balance_channel_with_executor<K, E>(
138         capacity: usize,
139         executor: E,
140     ) -> (Self, Sender<Change<K, Endpoint>>)
141     where
142         K: Hash + Eq + Send + Clone + 'static,
143         E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
144     {
145         let (tx, rx) = channel(capacity);
146         let list = DynamicServiceStream::new(rx);
147         (Self::balance(list, DEFAULT_BUFFER_SIZE, executor), tx)
148     }
149 
new<C>(connector: C, endpoint: Endpoint) -> Self where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,150     pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self
151     where
152         C: Service<Uri> + Send + 'static,
153         C::Error: Into<crate::Error> + Send,
154         C::Future: Unpin + Send,
155         C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
156     {
157         let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
158         let executor = endpoint.executor.clone();
159 
160         let svc = Connection::lazy(connector, endpoint);
161         let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size);
162         executor.execute(Box::pin(worker));
163 
164         Channel { svc }
165     }
166 
connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error> where C: Service<Uri> + Send + 'static, C::Error: Into<crate::Error> + Send, C::Future: Unpin + Send, C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,167     pub(crate) async fn connect<C>(connector: C, endpoint: Endpoint) -> Result<Self, super::Error>
168     where
169         C: Service<Uri> + Send + 'static,
170         C::Error: Into<crate::Error> + Send,
171         C::Future: Unpin + Send,
172         C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
173     {
174         let buffer_size = endpoint.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE);
175         let executor = endpoint.executor.clone();
176 
177         let svc = Connection::connect(connector, endpoint)
178             .await
179             .map_err(super::Error::from_source)?;
180         let (svc, worker) = Buffer::pair(Either::A(svc), buffer_size);
181         executor.execute(Box::pin(worker));
182 
183         Ok(Channel { svc })
184     }
185 
balance<D, E>(discover: D, buffer_size: usize, executor: E) -> Self where D: Discover<Service = Connection> + Unpin + Send + 'static, D::Error: Into<crate::Error>, D::Key: Hash + Send + Clone, E: Executor<crate::transport::BoxFuture<'static, ()>> + Send + Sync + 'static,186     pub(crate) fn balance<D, E>(discover: D, buffer_size: usize, executor: E) -> Self
187     where
188         D: Discover<Service = Connection> + Unpin + Send + 'static,
189         D::Error: Into<crate::Error>,
190         D::Key: Hash + Send + Clone,
191         E: Executor<crate::transport::BoxFuture<'static, ()>> + Send + Sync + 'static,
192     {
193         let svc = Balance::new(discover);
194 
195         let svc = BoxService::new(svc);
196         let (svc, worker) = Buffer::pair(Either::B(svc), buffer_size);
197         executor.execute(Box::pin(worker));
198 
199         Channel { svc }
200     }
201 }
202 
203 impl Service<http::Request<BoxBody>> for Channel {
204     type Response = http::Response<super::Body>;
205     type Error = super::Error;
206     type Future = ResponseFuture;
207 
poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>208     fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
209         Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source)
210     }
211 
call(&mut self, request: http::Request<BoxBody>) -> Self::Future212     fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future {
213         let inner = Service::call(&mut self.svc, request);
214 
215         ResponseFuture { inner }
216     }
217 }
218 
219 impl Future for ResponseFuture {
220     type Output = Result<Response<hyper::Body>, super::Error>;
221 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>222     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
223         let val = ready!(Pin::new(&mut self.inner).poll(cx)).map_err(super::Error::from_source)?;
224         Ok(val).into()
225     }
226 }
227 
228 impl fmt::Debug for Channel {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result229     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230         f.debug_struct("Channel").finish()
231     }
232 }
233 
234 impl fmt::Debug for ResponseFuture {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result235     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236         f.debug_struct("ResponseFuture").finish()
237     }
238 }
239