1 use std::io; 2 use std::io::IoSlice; 3 use std::pin::Pin; 4 use std::task::{Context, Poll}; 5 use std::time::Duration; 6 7 use hyper::client::connect::{Connected, Connection}; 8 use pin_project_lite::pin_project; 9 use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; 10 use tokio_io_timeout::TimeoutStream; 11 12 pin_project! { 13 /// A timeout stream that implements required traits to be a Connector 14 #[derive(Debug)] 15 pub struct TimeoutConnectorStream<S> { 16 #[pin] 17 stream: TimeoutStream<S> 18 } 19 } 20 21 impl<S> TimeoutConnectorStream<S> 22 where 23 S: AsyncRead + AsyncWrite + Unpin, 24 { 25 /// Returns a new `TimeoutConnectorStream` wrapping the specified stream. 26 /// 27 /// There is initially no read or write timeout. new(stream: TimeoutStream<S>) -> TimeoutConnectorStream<S>28 pub fn new(stream: TimeoutStream<S>) -> TimeoutConnectorStream<S> { 29 TimeoutConnectorStream { stream } 30 } 31 32 /// Returns the current read timeout. read_timeout(&self) -> Option<Duration>33 pub fn read_timeout(&self) -> Option<Duration> { 34 self.stream.read_timeout() 35 } 36 37 /// Sets the read timeout. 38 /// 39 /// This can only be used before the stream is pinned; use 40 /// [`set_read_timeout_pinned`](Self::set_read_timeout_pinned) otherwise. set_read_timeout(&mut self, timeout: Option<Duration>)41 pub fn set_read_timeout(&mut self, timeout: Option<Duration>) { 42 self.stream.set_read_timeout(timeout) 43 } 44 45 /// Sets the read timeout. 46 /// 47 /// This will reset any pending read timeout. Use 48 /// [`set_read_timeout`](Self::set_read_timeout) instead if the stream has not yet been pinned. set_read_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>)49 pub fn set_read_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>) { 50 self.project() 51 .stream 52 .as_mut() 53 .set_read_timeout_pinned(timeout) 54 } 55 56 /// Returns the current write timeout. write_timeout(&self) -> Option<Duration>57 pub fn write_timeout(&self) -> Option<Duration> { 58 self.stream.write_timeout() 59 } 60 61 /// Sets the write timeout. 62 /// 63 /// This can only be used before the stream is pinned; use 64 /// [`set_write_timeout_pinned`](Self::set_write_timeout_pinned) otherwise. set_write_timeout(&mut self, timeout: Option<Duration>)65 pub fn set_write_timeout(&mut self, timeout: Option<Duration>) { 66 self.stream.set_write_timeout(timeout) 67 } 68 69 /// Sets the write timeout. 70 /// 71 /// This will reset any pending write timeout. Use 72 /// [`set_write_timeout`](Self::set_write_timeout) instead if the stream has not yet been 73 /// pinned. set_write_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>)74 pub fn set_write_timeout_pinned(self: Pin<&mut Self>, timeout: Option<Duration>) { 75 self.project() 76 .stream 77 .as_mut() 78 .set_write_timeout_pinned(timeout) 79 } 80 81 /// Returns a shared reference to the inner stream. get_ref(&self) -> &S82 pub fn get_ref(&self) -> &S { 83 self.stream.get_ref() 84 } 85 86 /// Returns a mutable reference to the inner stream. get_mut(&mut self) -> &mut S87 pub fn get_mut(&mut self) -> &mut S { 88 self.stream.get_mut() 89 } 90 91 /// Returns a pinned mutable reference to the inner stream. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S>92 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut S> { 93 self.project().stream.get_pin_mut() 94 } 95 96 /// Consumes the stream, returning the inner stream. into_inner(self) -> S97 pub fn into_inner(self) -> S { 98 self.stream.into_inner() 99 } 100 } 101 102 impl<S> AsyncRead for TimeoutConnectorStream<S> 103 where 104 S: AsyncRead + AsyncWrite + Unpin, 105 { poll_read( self: Pin<&mut Self>, cx: &mut Context, buf: &mut ReadBuf, ) -> Poll<Result<(), io::Error>>106 fn poll_read( 107 self: Pin<&mut Self>, 108 cx: &mut Context, 109 buf: &mut ReadBuf, 110 ) -> Poll<Result<(), io::Error>> { 111 self.project().stream.poll_read(cx, buf) 112 } 113 } 114 115 impl<S> AsyncWrite for TimeoutConnectorStream<S> 116 where 117 S: AsyncRead + AsyncWrite + Unpin, 118 { poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, io::Error>>119 fn poll_write( 120 self: Pin<&mut Self>, 121 cx: &mut Context<'_>, 122 buf: &[u8], 123 ) -> Poll<Result<usize, io::Error>> { 124 self.project().stream.poll_write(cx, buf) 125 } 126 poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize, io::Error>>127 fn poll_write_vectored( 128 self: Pin<&mut Self>, 129 cx: &mut Context<'_>, 130 bufs: &[IoSlice<'_>], 131 ) -> Poll<Result<usize, io::Error>> { 132 self.project().stream.poll_write_vectored(cx, bufs) 133 } 134 is_write_vectored(&self) -> bool135 fn is_write_vectored(&self) -> bool { 136 self.stream.is_write_vectored() 137 } 138 poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>139 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { 140 self.project().stream.poll_flush(cx) 141 } 142 poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>143 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> { 144 self.project().stream.poll_shutdown(cx) 145 } 146 } 147 148 impl<S> Connection for TimeoutConnectorStream<S> 149 where 150 S: AsyncRead + AsyncWrite + Connection + Unpin, 151 { connected(&self) -> Connected152 fn connected(&self) -> Connected { 153 self.stream.get_ref().connected() 154 } 155 } 156 157 impl<S> Connection for Pin<Box<TimeoutConnectorStream<S>>> 158 where 159 S: AsyncRead + AsyncWrite + Connection + Unpin, 160 { connected(&self) -> Connected161 fn connected(&self) -> Connected { 162 self.stream.get_ref().connected() 163 } 164 } 165