1 use super::connection::Connection; 2 use crate::transport::Endpoint; 3 4 use std::{ 5 hash::Hash, 6 pin::Pin, 7 task::{Context, Poll}, 8 }; 9 use tokio::sync::mpsc::Receiver; 10 11 use tokio_stream::Stream; 12 use tower::discover::Change; 13 14 type DiscoverResult<K, S, E> = Result<Change<K, S>, E>; 15 16 pub(crate) struct DynamicServiceStream<K: Hash + Eq + Clone> { 17 changes: Receiver<Change<K, Endpoint>>, 18 } 19 20 impl<K: Hash + Eq + Clone> DynamicServiceStream<K> { new(changes: Receiver<Change<K, Endpoint>>) -> Self21 pub(crate) fn new(changes: Receiver<Change<K, Endpoint>>) -> Self { 22 Self { changes } 23 } 24 } 25 26 impl<K: Hash + Eq + Clone> Stream for DynamicServiceStream<K> { 27 type Item = DiscoverResult<K, Connection, crate::Error>; 28 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>29 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 30 let c = &mut self.changes; 31 match Pin::new(&mut *c).poll_recv(cx) { 32 Poll::Pending | Poll::Ready(None) => Poll::Pending, 33 Poll::Ready(Some(change)) => match change { 34 Change::Insert(k, endpoint) => { 35 let mut http = hyper::client::connect::HttpConnector::new(); 36 http.set_nodelay(endpoint.tcp_nodelay); 37 http.set_keepalive(endpoint.tcp_keepalive); 38 http.set_connect_timeout(endpoint.connect_timeout); 39 http.enforce_http(false); 40 41 let connection = Connection::lazy(endpoint.connector(http), endpoint); 42 let change = Ok(Change::Insert(k, connection)); 43 Poll::Ready(Some(change)) 44 } 45 Change::Remove(k) => Poll::Ready(Some(Ok(Change::Remove(k)))), 46 }, 47 } 48 } 49 } 50 51 impl<K: Hash + Eq + Clone> Unpin for DynamicServiceStream<K> {} 52