1 use std::fmt; 2 use std::io::{self, IoSlice, IoSliceMut, Read, Write}; 3 use std::net::{self, Shutdown, SocketAddr}; 4 #[cfg(unix)] 5 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; 6 #[cfg(target_os = "wasi")] 7 use std::os::wasi::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; 8 #[cfg(windows)] 9 use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket}; 10 11 use crate::io_source::IoSource; 12 #[cfg(not(target_os = "wasi"))] 13 use crate::sys::tcp::{connect, new_for_addr}; 14 use crate::{event, Interest, Registry, Token}; 15 16 /// A non-blocking TCP stream between a local socket and a remote socket. 17 /// 18 /// The socket will be closed when the value is dropped. 19 /// 20 /// # Examples 21 /// 22 #[cfg_attr(feature = "os-poll", doc = "```")] 23 #[cfg_attr(not(feature = "os-poll"), doc = "```ignore")] 24 /// # use std::net::{TcpListener, SocketAddr}; 25 /// # use std::error::Error; 26 /// # 27 /// # fn main() -> Result<(), Box<dyn Error>> { 28 /// let address: SocketAddr = "127.0.0.1:0".parse()?; 29 /// let listener = TcpListener::bind(address)?; 30 /// use mio::{Events, Interest, Poll, Token}; 31 /// use mio::net::TcpStream; 32 /// use std::time::Duration; 33 /// 34 /// let mut stream = TcpStream::connect(listener.local_addr()?)?; 35 /// 36 /// let mut poll = Poll::new()?; 37 /// let mut events = Events::with_capacity(128); 38 /// 39 /// // Register the socket with `Poll` 40 /// poll.registry().register(&mut stream, Token(0), Interest::WRITABLE)?; 41 /// 42 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?; 43 /// 44 /// // The socket might be ready at this point 45 /// # Ok(()) 46 /// # } 47 /// ``` 48 pub struct TcpStream { 49 inner: IoSource<net::TcpStream>, 50 } 51 52 impl TcpStream { 53 /// Create a new TCP stream and issue a non-blocking connect to the 54 /// specified address. 55 /// 56 /// # Notes 57 /// 58 /// The returned `TcpStream` may not be connected (and thus usable), unlike 59 /// the API found in `std::net::TcpStream`. Because Mio issues a 60 /// *non-blocking* connect it will not block the thread and instead return 61 /// an unconnected `TcpStream`. 62 /// 63 /// Ensuring the returned stream is connected is surprisingly complex when 64 /// considering cross-platform support. Doing this properly should follow 65 /// the steps below, an example implementation can be found 66 /// [here](https://github.com/Thomasdezeeuw/heph/blob/0c4f1ab3eaf08bea1d65776528bfd6114c9f8374/src/net/tcp/stream.rs#L560-L622). 67 /// 68 /// 1. Call `TcpStream::connect` 69 /// 2. Register the returned stream with at least [write interest]. 70 /// 3. Wait for a (writable) event. 71 /// 4. Check `TcpStream::peer_addr`. If it returns `libc::EINPROGRESS` or 72 /// `ErrorKind::NotConnected` it means the stream is not yet connected, 73 /// go back to step 3. If it returns an address it means the stream is 74 /// connected, go to step 5. If another error is returned something 75 /// went wrong. 76 /// 5. Now the stream can be used. 77 /// 78 /// This may return a `WouldBlock` in which case the socket connection 79 /// cannot be completed immediately, it usually means there are insufficient 80 /// entries in the routing cache. 81 /// 82 /// [write interest]: Interest::WRITABLE 83 #[cfg(not(target_os = "wasi"))] connect(addr: SocketAddr) -> io::Result<TcpStream>84 pub fn connect(addr: SocketAddr) -> io::Result<TcpStream> { 85 let socket = new_for_addr(addr)?; 86 #[cfg(unix)] 87 let stream = unsafe { TcpStream::from_raw_fd(socket) }; 88 #[cfg(windows)] 89 let stream = unsafe { TcpStream::from_raw_socket(socket as _) }; 90 connect(&stream.inner, addr)?; 91 Ok(stream) 92 } 93 94 /// Creates a new `TcpStream` from a standard `net::TcpStream`. 95 /// 96 /// This function is intended to be used to wrap a TCP stream from the 97 /// standard library in the Mio equivalent. The conversion assumes nothing 98 /// about the underlying stream; it is left up to the user to set it in 99 /// non-blocking mode. 100 /// 101 /// # Note 102 /// 103 /// The TCP stream here will not have `connect` called on it, so it 104 /// should already be connected via some other means (be it manually, or 105 /// the standard library). from_std(stream: net::TcpStream) -> TcpStream106 pub fn from_std(stream: net::TcpStream) -> TcpStream { 107 TcpStream { 108 inner: IoSource::new(stream), 109 } 110 } 111 112 /// Returns the socket address of the remote peer of this TCP connection. peer_addr(&self) -> io::Result<SocketAddr>113 pub fn peer_addr(&self) -> io::Result<SocketAddr> { 114 self.inner.peer_addr() 115 } 116 117 /// Returns the socket address of the local half of this TCP connection. local_addr(&self) -> io::Result<SocketAddr>118 pub fn local_addr(&self) -> io::Result<SocketAddr> { 119 self.inner.local_addr() 120 } 121 122 /// Shuts down the read, write, or both halves of this connection. 123 /// 124 /// This function will cause all pending and future I/O on the specified 125 /// portions to return immediately with an appropriate value (see the 126 /// documentation of `Shutdown`). shutdown(&self, how: Shutdown) -> io::Result<()>127 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { 128 self.inner.shutdown(how) 129 } 130 131 /// Sets the value of the `TCP_NODELAY` option on this socket. 132 /// 133 /// If set, this option disables the Nagle algorithm. This means that 134 /// segments are always sent as soon as possible, even if there is only a 135 /// small amount of data. When not set, data is buffered until there is a 136 /// sufficient amount to send out, thereby avoiding the frequent sending of 137 /// small packets. 138 /// 139 /// # Notes 140 /// 141 /// On Windows make sure the stream is connected before calling this method, 142 /// by receiving an (writable) event. Trying to set `nodelay` on an 143 /// unconnected `TcpStream` is unspecified behavior. set_nodelay(&self, nodelay: bool) -> io::Result<()>144 pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> { 145 self.inner.set_nodelay(nodelay) 146 } 147 148 /// Gets the value of the `TCP_NODELAY` option on this socket. 149 /// 150 /// For more information about this option, see [`set_nodelay`][link]. 151 /// 152 /// [link]: #method.set_nodelay 153 /// 154 /// # Notes 155 /// 156 /// On Windows make sure the stream is connected before calling this method, 157 /// by receiving an (writable) event. Trying to get `nodelay` on an 158 /// unconnected `TcpStream` is unspecified behavior. nodelay(&self) -> io::Result<bool>159 pub fn nodelay(&self) -> io::Result<bool> { 160 self.inner.nodelay() 161 } 162 163 /// Sets the value for the `IP_TTL` option on this socket. 164 /// 165 /// This value sets the time-to-live field that is used in every packet sent 166 /// from this socket. 167 /// 168 /// # Notes 169 /// 170 /// On Windows make sure the stream is connected before calling this method, 171 /// by receiving an (writable) event. Trying to set `ttl` on an 172 /// unconnected `TcpStream` is unspecified behavior. set_ttl(&self, ttl: u32) -> io::Result<()>173 pub fn set_ttl(&self, ttl: u32) -> io::Result<()> { 174 self.inner.set_ttl(ttl) 175 } 176 177 /// Gets the value of the `IP_TTL` option for this socket. 178 /// 179 /// For more information about this option, see [`set_ttl`][link]. 180 /// 181 /// # Notes 182 /// 183 /// On Windows make sure the stream is connected before calling this method, 184 /// by receiving an (writable) event. Trying to get `ttl` on an 185 /// unconnected `TcpStream` is unspecified behavior. 186 /// 187 /// [link]: #method.set_ttl ttl(&self) -> io::Result<u32>188 pub fn ttl(&self) -> io::Result<u32> { 189 self.inner.ttl() 190 } 191 192 /// Get the value of the `SO_ERROR` option on this socket. 193 /// 194 /// This will retrieve the stored error in the underlying socket, clearing 195 /// the field in the process. This can be useful for checking errors between 196 /// calls. take_error(&self) -> io::Result<Option<io::Error>>197 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 198 self.inner.take_error() 199 } 200 201 /// Receives data on the socket from the remote address to which it is 202 /// connected, without removing that data from the queue. On success, 203 /// returns the number of bytes peeked. 204 /// 205 /// Successive calls return the same data. This is accomplished by passing 206 /// `MSG_PEEK` as a flag to the underlying recv system call. peek(&self, buf: &mut [u8]) -> io::Result<usize>207 pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { 208 self.inner.peek(buf) 209 } 210 211 /// Execute an I/O operation ensuring that the socket receives more events 212 /// if it hits a [`WouldBlock`] error. 213 /// 214 /// # Notes 215 /// 216 /// This method is required to be called for **all** I/O operations to 217 /// ensure the user will receive events once the socket is ready again after 218 /// returning a [`WouldBlock`] error. 219 /// 220 /// [`WouldBlock`]: io::ErrorKind::WouldBlock 221 /// 222 /// # Examples 223 /// 224 #[cfg_attr(unix, doc = "```no_run")] 225 #[cfg_attr(windows, doc = "```ignore")] 226 /// # use std::error::Error; 227 /// # 228 /// # fn main() -> Result<(), Box<dyn Error>> { 229 /// use std::io; 230 /// #[cfg(unix)] 231 /// use std::os::unix::io::AsRawFd; 232 /// #[cfg(windows)] 233 /// use std::os::windows::io::AsRawSocket; 234 /// use mio::net::TcpStream; 235 /// 236 /// let address = "127.0.0.1:8080".parse().unwrap(); 237 /// let stream = TcpStream::connect(address)?; 238 /// 239 /// // Wait until the stream is readable... 240 /// 241 /// // Read from the stream using a direct libc call, of course the 242 /// // `io::Read` implementation would be easier to use. 243 /// let mut buf = [0; 512]; 244 /// let n = stream.try_io(|| { 245 /// let buf_ptr = &mut buf as *mut _ as *mut _; 246 /// #[cfg(unix)] 247 /// let res = unsafe { libc::recv(stream.as_raw_fd(), buf_ptr, buf.len(), 0) }; 248 /// #[cfg(windows)] 249 /// let res = unsafe { libc::recvfrom(stream.as_raw_socket() as usize, buf_ptr, buf.len() as i32, 0, std::ptr::null_mut(), std::ptr::null_mut()) }; 250 /// if res != -1 { 251 /// Ok(res as usize) 252 /// } else { 253 /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure 254 /// // should return `WouldBlock` error. 255 /// Err(io::Error::last_os_error()) 256 /// } 257 /// })?; 258 /// eprintln!("read {} bytes", n); 259 /// # Ok(()) 260 /// # } 261 /// ``` try_io<F, T>(&self, f: F) -> io::Result<T> where F: FnOnce() -> io::Result<T>,262 pub fn try_io<F, T>(&self, f: F) -> io::Result<T> 263 where 264 F: FnOnce() -> io::Result<T>, 265 { 266 self.inner.do_io(|_| f()) 267 } 268 } 269 270 impl Read for TcpStream { read(&mut self, buf: &mut [u8]) -> io::Result<usize>271 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 272 self.inner.do_io(|mut inner| inner.read(buf)) 273 } 274 read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>275 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { 276 self.inner.do_io(|mut inner| inner.read_vectored(bufs)) 277 } 278 } 279 280 impl<'a> Read for &'a TcpStream { read(&mut self, buf: &mut [u8]) -> io::Result<usize>281 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 282 self.inner.do_io(|mut inner| inner.read(buf)) 283 } 284 read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>285 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { 286 self.inner.do_io(|mut inner| inner.read_vectored(bufs)) 287 } 288 } 289 290 impl Write for TcpStream { write(&mut self, buf: &[u8]) -> io::Result<usize>291 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 292 self.inner.do_io(|mut inner| inner.write(buf)) 293 } 294 write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>295 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { 296 self.inner.do_io(|mut inner| inner.write_vectored(bufs)) 297 } 298 flush(&mut self) -> io::Result<()>299 fn flush(&mut self) -> io::Result<()> { 300 self.inner.do_io(|mut inner| inner.flush()) 301 } 302 } 303 304 impl<'a> Write for &'a TcpStream { write(&mut self, buf: &[u8]) -> io::Result<usize>305 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 306 self.inner.do_io(|mut inner| inner.write(buf)) 307 } 308 write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>309 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { 310 self.inner.do_io(|mut inner| inner.write_vectored(bufs)) 311 } 312 flush(&mut self) -> io::Result<()>313 fn flush(&mut self) -> io::Result<()> { 314 self.inner.do_io(|mut inner| inner.flush()) 315 } 316 } 317 318 impl event::Source for TcpStream { register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>319 fn register( 320 &mut self, 321 registry: &Registry, 322 token: Token, 323 interests: Interest, 324 ) -> io::Result<()> { 325 self.inner.register(registry, token, interests) 326 } 327 reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>328 fn reregister( 329 &mut self, 330 registry: &Registry, 331 token: Token, 332 interests: Interest, 333 ) -> io::Result<()> { 334 self.inner.reregister(registry, token, interests) 335 } 336 deregister(&mut self, registry: &Registry) -> io::Result<()>337 fn deregister(&mut self, registry: &Registry) -> io::Result<()> { 338 self.inner.deregister(registry) 339 } 340 } 341 342 impl fmt::Debug for TcpStream { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result343 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 344 self.inner.fmt(f) 345 } 346 } 347 348 #[cfg(unix)] 349 impl IntoRawFd for TcpStream { into_raw_fd(self) -> RawFd350 fn into_raw_fd(self) -> RawFd { 351 self.inner.into_inner().into_raw_fd() 352 } 353 } 354 355 #[cfg(unix)] 356 impl AsRawFd for TcpStream { as_raw_fd(&self) -> RawFd357 fn as_raw_fd(&self) -> RawFd { 358 self.inner.as_raw_fd() 359 } 360 } 361 362 #[cfg(unix)] 363 impl FromRawFd for TcpStream { 364 /// Converts a `RawFd` to a `TcpStream`. 365 /// 366 /// # Notes 367 /// 368 /// The caller is responsible for ensuring that the socket is in 369 /// non-blocking mode. from_raw_fd(fd: RawFd) -> TcpStream370 unsafe fn from_raw_fd(fd: RawFd) -> TcpStream { 371 TcpStream::from_std(FromRawFd::from_raw_fd(fd)) 372 } 373 } 374 375 #[cfg(windows)] 376 impl IntoRawSocket for TcpStream { into_raw_socket(self) -> RawSocket377 fn into_raw_socket(self) -> RawSocket { 378 self.inner.into_inner().into_raw_socket() 379 } 380 } 381 382 #[cfg(windows)] 383 impl AsRawSocket for TcpStream { as_raw_socket(&self) -> RawSocket384 fn as_raw_socket(&self) -> RawSocket { 385 self.inner.as_raw_socket() 386 } 387 } 388 389 #[cfg(windows)] 390 impl FromRawSocket for TcpStream { 391 /// Converts a `RawSocket` to a `TcpStream`. 392 /// 393 /// # Notes 394 /// 395 /// The caller is responsible for ensuring that the socket is in 396 /// non-blocking mode. from_raw_socket(socket: RawSocket) -> TcpStream397 unsafe fn from_raw_socket(socket: RawSocket) -> TcpStream { 398 TcpStream::from_std(FromRawSocket::from_raw_socket(socket)) 399 } 400 } 401 402 #[cfg(target_os = "wasi")] 403 impl IntoRawFd for TcpStream { into_raw_fd(self) -> RawFd404 fn into_raw_fd(self) -> RawFd { 405 self.inner.into_inner().into_raw_fd() 406 } 407 } 408 409 #[cfg(target_os = "wasi")] 410 impl AsRawFd for TcpStream { as_raw_fd(&self) -> RawFd411 fn as_raw_fd(&self) -> RawFd { 412 self.inner.as_raw_fd() 413 } 414 } 415 416 #[cfg(target_os = "wasi")] 417 impl FromRawFd for TcpStream { 418 /// Converts a `RawFd` to a `TcpStream`. 419 /// 420 /// # Notes 421 /// 422 /// The caller is responsible for ensuring that the socket is in 423 /// non-blocking mode. from_raw_fd(fd: RawFd) -> TcpStream424 unsafe fn from_raw_fd(fd: RawFd) -> TcpStream { 425 TcpStream::from_std(FromRawFd::from_raw_fd(fd)) 426 } 427 } 428