• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::fmt;
2 use std::io::{self, IoSlice, IoSliceMut, Read, Write};
3 use std::net::{self, Shutdown, SocketAddr};
4 #[cfg(unix)]
5 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
6 #[cfg(target_os = "wasi")]
7 use std::os::wasi::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
8 #[cfg(windows)]
9 use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
10 
11 use crate::io_source::IoSource;
12 #[cfg(not(target_os = "wasi"))]
13 use crate::sys::tcp::{connect, new_for_addr};
14 use crate::{event, Interest, Registry, Token};
15 
16 /// A non-blocking TCP stream between a local socket and a remote socket.
17 ///
18 /// The socket will be closed when the value is dropped.
19 ///
20 /// # Examples
21 ///
22 #[cfg_attr(feature = "os-poll", doc = "```")]
23 #[cfg_attr(not(feature = "os-poll"), doc = "```ignore")]
24 /// # use std::net::{TcpListener, SocketAddr};
25 /// # use std::error::Error;
26 /// #
27 /// # fn main() -> Result<(), Box<dyn Error>> {
28 /// let address: SocketAddr = "127.0.0.1:0".parse()?;
29 /// let listener = TcpListener::bind(address)?;
30 /// use mio::{Events, Interest, Poll, Token};
31 /// use mio::net::TcpStream;
32 /// use std::time::Duration;
33 ///
34 /// let mut stream = TcpStream::connect(listener.local_addr()?)?;
35 ///
36 /// let mut poll = Poll::new()?;
37 /// let mut events = Events::with_capacity(128);
38 ///
39 /// // Register the socket with `Poll`
40 /// poll.registry().register(&mut stream, Token(0), Interest::WRITABLE)?;
41 ///
42 /// poll.poll(&mut events, Some(Duration::from_millis(100)))?;
43 ///
44 /// // The socket might be ready at this point
45 /// #     Ok(())
46 /// # }
47 /// ```
48 pub struct TcpStream {
49     inner: IoSource<net::TcpStream>,
50 }
51 
52 impl TcpStream {
53     /// Create a new TCP stream and issue a non-blocking connect to the
54     /// specified address.
55     ///
56     /// # Notes
57     ///
58     /// The returned `TcpStream` may not be connected (and thus usable), unlike
59     /// the API found in `std::net::TcpStream`. Because Mio issues a
60     /// *non-blocking* connect it will not block the thread and instead return
61     /// an unconnected `TcpStream`.
62     ///
63     /// Ensuring the returned stream is connected is surprisingly complex when
64     /// considering cross-platform support. Doing this properly should follow
65     /// the steps below, an example implementation can be found
66     /// [here](https://github.com/Thomasdezeeuw/heph/blob/0c4f1ab3eaf08bea1d65776528bfd6114c9f8374/src/net/tcp/stream.rs#L560-L622).
67     ///
68     ///  1. Call `TcpStream::connect`
69     ///  2. Register the returned stream with at least [write interest].
70     ///  3. Wait for a (writable) event.
71     ///  4. Check `TcpStream::peer_addr`. If it returns `libc::EINPROGRESS` or
72     ///     `ErrorKind::NotConnected` it means the stream is not yet connected,
73     ///     go back to step 3. If it returns an address it means the stream is
74     ///     connected, go to step 5. If another error is returned something
75     ///     went wrong.
76     ///  5. Now the stream can be used.
77     ///
78     /// This may return a `WouldBlock` in which case the socket connection
79     /// cannot be completed immediately, it usually means there are insufficient
80     /// entries in the routing cache.
81     ///
82     /// [write interest]: Interest::WRITABLE
83     #[cfg(not(target_os = "wasi"))]
connect(addr: SocketAddr) -> io::Result<TcpStream>84     pub fn connect(addr: SocketAddr) -> io::Result<TcpStream> {
85         let socket = new_for_addr(addr)?;
86         #[cfg(unix)]
87         let stream = unsafe { TcpStream::from_raw_fd(socket) };
88         #[cfg(windows)]
89         let stream = unsafe { TcpStream::from_raw_socket(socket as _) };
90         connect(&stream.inner, addr)?;
91         Ok(stream)
92     }
93 
94     /// Creates a new `TcpStream` from a standard `net::TcpStream`.
95     ///
96     /// This function is intended to be used to wrap a TCP stream from the
97     /// standard library in the Mio equivalent. The conversion assumes nothing
98     /// about the underlying stream; it is left up to the user to set it in
99     /// non-blocking mode.
100     ///
101     /// # Note
102     ///
103     /// The TCP stream here will not have `connect` called on it, so it
104     /// should already be connected via some other means (be it manually, or
105     /// the standard library).
from_std(stream: net::TcpStream) -> TcpStream106     pub fn from_std(stream: net::TcpStream) -> TcpStream {
107         TcpStream {
108             inner: IoSource::new(stream),
109         }
110     }
111 
112     /// Returns the socket address of the remote peer of this TCP connection.
peer_addr(&self) -> io::Result<SocketAddr>113     pub fn peer_addr(&self) -> io::Result<SocketAddr> {
114         self.inner.peer_addr()
115     }
116 
117     /// Returns the socket address of the local half of this TCP connection.
local_addr(&self) -> io::Result<SocketAddr>118     pub fn local_addr(&self) -> io::Result<SocketAddr> {
119         self.inner.local_addr()
120     }
121 
122     /// Shuts down the read, write, or both halves of this connection.
123     ///
124     /// This function will cause all pending and future I/O on the specified
125     /// portions to return immediately with an appropriate value (see the
126     /// documentation of `Shutdown`).
shutdown(&self, how: Shutdown) -> io::Result<()>127     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
128         self.inner.shutdown(how)
129     }
130 
131     /// Sets the value of the `TCP_NODELAY` option on this socket.
132     ///
133     /// If set, this option disables the Nagle algorithm. This means that
134     /// segments are always sent as soon as possible, even if there is only a
135     /// small amount of data. When not set, data is buffered until there is a
136     /// sufficient amount to send out, thereby avoiding the frequent sending of
137     /// small packets.
138     ///
139     /// # Notes
140     ///
141     /// On Windows make sure the stream is connected before calling this method,
142     /// by receiving an (writable) event. Trying to set `nodelay` on an
143     /// unconnected `TcpStream` is unspecified behavior.
set_nodelay(&self, nodelay: bool) -> io::Result<()>144     pub fn set_nodelay(&self, nodelay: bool) -> io::Result<()> {
145         self.inner.set_nodelay(nodelay)
146     }
147 
148     /// Gets the value of the `TCP_NODELAY` option on this socket.
149     ///
150     /// For more information about this option, see [`set_nodelay`][link].
151     ///
152     /// [link]: #method.set_nodelay
153     ///
154     /// # Notes
155     ///
156     /// On Windows make sure the stream is connected before calling this method,
157     /// by receiving an (writable) event. Trying to get `nodelay` on an
158     /// unconnected `TcpStream` is unspecified behavior.
nodelay(&self) -> io::Result<bool>159     pub fn nodelay(&self) -> io::Result<bool> {
160         self.inner.nodelay()
161     }
162 
163     /// Sets the value for the `IP_TTL` option on this socket.
164     ///
165     /// This value sets the time-to-live field that is used in every packet sent
166     /// from this socket.
167     ///
168     /// # Notes
169     ///
170     /// On Windows make sure the stream is connected before calling this method,
171     /// by receiving an (writable) event. Trying to set `ttl` on an
172     /// unconnected `TcpStream` is unspecified behavior.
set_ttl(&self, ttl: u32) -> io::Result<()>173     pub fn set_ttl(&self, ttl: u32) -> io::Result<()> {
174         self.inner.set_ttl(ttl)
175     }
176 
177     /// Gets the value of the `IP_TTL` option for this socket.
178     ///
179     /// For more information about this option, see [`set_ttl`][link].
180     ///
181     /// # Notes
182     ///
183     /// On Windows make sure the stream is connected before calling this method,
184     /// by receiving an (writable) event. Trying to get `ttl` on an
185     /// unconnected `TcpStream` is unspecified behavior.
186     ///
187     /// [link]: #method.set_ttl
ttl(&self) -> io::Result<u32>188     pub fn ttl(&self) -> io::Result<u32> {
189         self.inner.ttl()
190     }
191 
192     /// Get the value of the `SO_ERROR` option on this socket.
193     ///
194     /// This will retrieve the stored error in the underlying socket, clearing
195     /// the field in the process. This can be useful for checking errors between
196     /// calls.
take_error(&self) -> io::Result<Option<io::Error>>197     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
198         self.inner.take_error()
199     }
200 
201     /// Receives data on the socket from the remote address to which it is
202     /// connected, without removing that data from the queue. On success,
203     /// returns the number of bytes peeked.
204     ///
205     /// Successive calls return the same data. This is accomplished by passing
206     /// `MSG_PEEK` as a flag to the underlying recv system call.
peek(&self, buf: &mut [u8]) -> io::Result<usize>207     pub fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
208         self.inner.peek(buf)
209     }
210 
211     /// Execute an I/O operation ensuring that the socket receives more events
212     /// if it hits a [`WouldBlock`] error.
213     ///
214     /// # Notes
215     ///
216     /// This method is required to be called for **all** I/O operations to
217     /// ensure the user will receive events once the socket is ready again after
218     /// returning a [`WouldBlock`] error.
219     ///
220     /// [`WouldBlock`]: io::ErrorKind::WouldBlock
221     ///
222     /// # Examples
223     ///
224     #[cfg_attr(unix, doc = "```no_run")]
225     #[cfg_attr(windows, doc = "```ignore")]
226     /// # use std::error::Error;
227     /// #
228     /// # fn main() -> Result<(), Box<dyn Error>> {
229     /// use std::io;
230     /// #[cfg(unix)]
231     /// use std::os::unix::io::AsRawFd;
232     /// #[cfg(windows)]
233     /// use std::os::windows::io::AsRawSocket;
234     /// use mio::net::TcpStream;
235     ///
236     /// let address = "127.0.0.1:8080".parse().unwrap();
237     /// let stream = TcpStream::connect(address)?;
238     ///
239     /// // Wait until the stream is readable...
240     ///
241     /// // Read from the stream using a direct libc call, of course the
242     /// // `io::Read` implementation would be easier to use.
243     /// let mut buf = [0; 512];
244     /// let n = stream.try_io(|| {
245     ///     let buf_ptr = &mut buf as *mut _ as *mut _;
246     ///     #[cfg(unix)]
247     ///     let res = unsafe { libc::recv(stream.as_raw_fd(), buf_ptr, buf.len(), 0) };
248     ///     #[cfg(windows)]
249     ///     let res = unsafe { libc::recvfrom(stream.as_raw_socket() as usize, buf_ptr, buf.len() as i32, 0, std::ptr::null_mut(), std::ptr::null_mut()) };
250     ///     if res != -1 {
251     ///         Ok(res as usize)
252     ///     } else {
253     ///         // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
254     ///         // should return `WouldBlock` error.
255     ///         Err(io::Error::last_os_error())
256     ///     }
257     /// })?;
258     /// eprintln!("read {} bytes", n);
259     /// # Ok(())
260     /// # }
261     /// ```
try_io<F, T>(&self, f: F) -> io::Result<T> where F: FnOnce() -> io::Result<T>,262     pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
263     where
264         F: FnOnce() -> io::Result<T>,
265     {
266         self.inner.do_io(|_| f())
267     }
268 }
269 
270 impl Read for TcpStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>271     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
272         self.inner.do_io(|mut inner| inner.read(buf))
273     }
274 
read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>275     fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
276         self.inner.do_io(|mut inner| inner.read_vectored(bufs))
277     }
278 }
279 
280 impl<'a> Read for &'a TcpStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>281     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
282         self.inner.do_io(|mut inner| inner.read(buf))
283     }
284 
read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>285     fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
286         self.inner.do_io(|mut inner| inner.read_vectored(bufs))
287     }
288 }
289 
290 impl Write for TcpStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>291     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
292         self.inner.do_io(|mut inner| inner.write(buf))
293     }
294 
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>295     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
296         self.inner.do_io(|mut inner| inner.write_vectored(bufs))
297     }
298 
flush(&mut self) -> io::Result<()>299     fn flush(&mut self) -> io::Result<()> {
300         self.inner.do_io(|mut inner| inner.flush())
301     }
302 }
303 
304 impl<'a> Write for &'a TcpStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>305     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
306         self.inner.do_io(|mut inner| inner.write(buf))
307     }
308 
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>309     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
310         self.inner.do_io(|mut inner| inner.write_vectored(bufs))
311     }
312 
flush(&mut self) -> io::Result<()>313     fn flush(&mut self) -> io::Result<()> {
314         self.inner.do_io(|mut inner| inner.flush())
315     }
316 }
317 
318 impl event::Source for TcpStream {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>319     fn register(
320         &mut self,
321         registry: &Registry,
322         token: Token,
323         interests: Interest,
324     ) -> io::Result<()> {
325         self.inner.register(registry, token, interests)
326     }
327 
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>328     fn reregister(
329         &mut self,
330         registry: &Registry,
331         token: Token,
332         interests: Interest,
333     ) -> io::Result<()> {
334         self.inner.reregister(registry, token, interests)
335     }
336 
deregister(&mut self, registry: &Registry) -> io::Result<()>337     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
338         self.inner.deregister(registry)
339     }
340 }
341 
342 impl fmt::Debug for TcpStream {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result343     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
344         self.inner.fmt(f)
345     }
346 }
347 
348 #[cfg(unix)]
349 impl IntoRawFd for TcpStream {
into_raw_fd(self) -> RawFd350     fn into_raw_fd(self) -> RawFd {
351         self.inner.into_inner().into_raw_fd()
352     }
353 }
354 
355 #[cfg(unix)]
356 impl AsRawFd for TcpStream {
as_raw_fd(&self) -> RawFd357     fn as_raw_fd(&self) -> RawFd {
358         self.inner.as_raw_fd()
359     }
360 }
361 
362 #[cfg(unix)]
363 impl FromRawFd for TcpStream {
364     /// Converts a `RawFd` to a `TcpStream`.
365     ///
366     /// # Notes
367     ///
368     /// The caller is responsible for ensuring that the socket is in
369     /// non-blocking mode.
from_raw_fd(fd: RawFd) -> TcpStream370     unsafe fn from_raw_fd(fd: RawFd) -> TcpStream {
371         TcpStream::from_std(FromRawFd::from_raw_fd(fd))
372     }
373 }
374 
375 #[cfg(windows)]
376 impl IntoRawSocket for TcpStream {
into_raw_socket(self) -> RawSocket377     fn into_raw_socket(self) -> RawSocket {
378         self.inner.into_inner().into_raw_socket()
379     }
380 }
381 
382 #[cfg(windows)]
383 impl AsRawSocket for TcpStream {
as_raw_socket(&self) -> RawSocket384     fn as_raw_socket(&self) -> RawSocket {
385         self.inner.as_raw_socket()
386     }
387 }
388 
389 #[cfg(windows)]
390 impl FromRawSocket for TcpStream {
391     /// Converts a `RawSocket` to a `TcpStream`.
392     ///
393     /// # Notes
394     ///
395     /// The caller is responsible for ensuring that the socket is in
396     /// non-blocking mode.
from_raw_socket(socket: RawSocket) -> TcpStream397     unsafe fn from_raw_socket(socket: RawSocket) -> TcpStream {
398         TcpStream::from_std(FromRawSocket::from_raw_socket(socket))
399     }
400 }
401 
402 #[cfg(target_os = "wasi")]
403 impl IntoRawFd for TcpStream {
into_raw_fd(self) -> RawFd404     fn into_raw_fd(self) -> RawFd {
405         self.inner.into_inner().into_raw_fd()
406     }
407 }
408 
409 #[cfg(target_os = "wasi")]
410 impl AsRawFd for TcpStream {
as_raw_fd(&self) -> RawFd411     fn as_raw_fd(&self) -> RawFd {
412         self.inner.as_raw_fd()
413     }
414 }
415 
416 #[cfg(target_os = "wasi")]
417 impl FromRawFd for TcpStream {
418     /// Converts a `RawFd` to a `TcpStream`.
419     ///
420     /// # Notes
421     ///
422     /// The caller is responsible for ensuring that the socket is in
423     /// non-blocking mode.
from_raw_fd(fd: RawFd) -> TcpStream424     unsafe fn from_raw_fd(fd: RawFd) -> TcpStream {
425         TcpStream::from_std(FromRawFd::from_raw_fd(fd))
426     }
427 }
428