use crate::transport::server::Connected; use hyper::client::connect::{Connected as HyperConnected, Connection}; use std::io; use std::io::IoSlice; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; #[cfg(feature = "tls")] use tokio_rustls::server::TlsStream; pub(in crate::transport) trait Io: AsyncRead + AsyncWrite + Send + 'static { } impl Io for T where T: AsyncRead + AsyncWrite + Send + 'static {} pub(crate) struct BoxedIo(Pin>); impl BoxedIo { pub(in crate::transport) fn new(io: I) -> Self { BoxedIo(Box::pin(io)) } } impl Connection for BoxedIo { fn connected(&self) -> HyperConnected { HyperConnected::new() } } impl Connected for BoxedIo { type ConnectInfo = NoneConnectInfo; fn connect_info(&self) -> Self::ConnectInfo { NoneConnectInfo } } #[derive(Copy, Clone)] pub(crate) struct NoneConnectInfo; impl AsyncRead for BoxedIo { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { Pin::new(&mut self.0).poll_read(cx, buf) } } impl AsyncWrite for BoxedIo { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { Pin::new(&mut self.0).poll_write(cx, buf) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.0).poll_flush(cx) } fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.0).poll_shutdown(cx) } fn poll_write_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { Pin::new(&mut self.0).poll_write_vectored(cx, bufs) } fn is_write_vectored(&self) -> bool { self.0.is_write_vectored() } } pub(crate) enum ServerIo { Io(IO), #[cfg(feature = "tls")] TlsIo(Box>), } use tower::util::Either; #[cfg(feature = "tls")] type ServerIoConnectInfo = Either<::ConnectInfo, as Connected>::ConnectInfo>; #[cfg(not(feature = "tls"))] type ServerIoConnectInfo = Either<::ConnectInfo, ()>; impl ServerIo { pub(in crate::transport) fn new_io(io: IO) -> Self { Self::Io(io) } #[cfg(feature = "tls")] pub(in crate::transport) fn new_tls_io(io: TlsStream) -> Self { Self::TlsIo(Box::new(io)) } #[cfg(feature = "tls")] pub(in crate::transport) fn connect_info(&self) -> ServerIoConnectInfo where IO: Connected, TlsStream: Connected, { match self { Self::Io(io) => Either::A(io.connect_info()), Self::TlsIo(io) => Either::B(io.connect_info()), } } #[cfg(not(feature = "tls"))] pub(in crate::transport) fn connect_info(&self) -> ServerIoConnectInfo where IO: Connected, { match self { Self::Io(io) => Either::A(io.connect_info()), } } } impl AsyncRead for ServerIo where IO: AsyncWrite + AsyncRead + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { match &mut *self { Self::Io(io) => Pin::new(io).poll_read(cx, buf), #[cfg(feature = "tls")] Self::TlsIo(io) => Pin::new(io).poll_read(cx, buf), } } } impl AsyncWrite for ServerIo where IO: AsyncWrite + AsyncRead + Unpin, { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { match &mut *self { Self::Io(io) => Pin::new(io).poll_write(cx, buf), #[cfg(feature = "tls")] Self::TlsIo(io) => Pin::new(io).poll_write(cx, buf), } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut *self { Self::Io(io) => Pin::new(io).poll_flush(cx), #[cfg(feature = "tls")] Self::TlsIo(io) => Pin::new(io).poll_flush(cx), } } fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &mut *self { Self::Io(io) => Pin::new(io).poll_shutdown(cx), #[cfg(feature = "tls")] Self::TlsIo(io) => Pin::new(io).poll_shutdown(cx), } } fn poll_write_vectored( mut self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { match &mut *self { Self::Io(io) => Pin::new(io).poll_write_vectored(cx, bufs), #[cfg(feature = "tls")] Self::TlsIo(io) => Pin::new(io).poll_write_vectored(cx, bufs), } } fn is_write_vectored(&self) -> bool { match self { Self::Io(io) => io.is_write_vectored(), #[cfg(feature = "tls")] Self::TlsIo(io) => io.is_write_vectored(), } } }