• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use super::connection::Connection;
2 use crate::transport::Endpoint;
3 
4 use std::{
5     hash::Hash,
6     pin::Pin,
7     task::{Context, Poll},
8 };
9 use tokio::sync::mpsc::Receiver;
10 
11 use tokio_stream::Stream;
12 use tower::discover::Change;
13 
14 type DiscoverResult<K, S, E> = Result<Change<K, S>, E>;
15 
16 pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> {
17     changes: Receiver<Change<K, Endpoint>>,
18 }
19 
20 impl<K: Hash + Eq + Clone> DynamicServiceStream<K> {
new(changes: Receiver<Change<K, Endpoint>>) -> Self21     pub(crate) fn new(changes: Receiver<Change<K, Endpoint>>) -> Self {
22         Self { changes }
23     }
24 }
25 
26 impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> {
27     type Item = DiscoverResult<K, Connection, crate::Error>;
28 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>29     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
30         let c = &mut self.changes;
31         match Pin::new(&mut *c).poll_recv(cx) {
32             Poll::Pending | Poll::Ready(None) => Poll::Pending,
33             Poll::Ready(Some(change)) => match change {
34                 Change::Insert(k, endpoint) => {
35                     let mut http = hyper::client::connect::HttpConnector::new();
36                     http.set_nodelay(endpoint.tcp_nodelay);
37                     http.set_keepalive(endpoint.tcp_keepalive);
38                     http.set_connect_timeout(endpoint.connect_timeout);
39                     http.enforce_http(false);
40 
41                     let connection = Connection::lazy(endpoint.connector(http), endpoint);
42                     let change = Ok(Change::Insert(k, connection));
43                     Poll::Ready(Some(change))
44                 }
45                 Change::Remove(k) => Poll::Ready(Some(Ok(Change::Remove(k)))),
46             },
47         }
48     }
49 }
50 
51 impl<K: Hash + Eq + Clone> Unpin for DynamicServiceStream<K> {}
52