• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::io_source::IoSource;
2 use crate::{event, sys, Interest, Registry, Token};
3 
4 use std::fmt;
5 use std::io::{self, IoSlice, IoSliceMut, Read, Write};
6 use std::net::Shutdown;
7 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
8 use std::os::unix::net;
9 use std::path::Path;
10 
11 /// A non-blocking Unix stream socket.
12 pub struct UnixStream {
13     inner: IoSource<net::UnixStream>,
14 }
15 
16 impl UnixStream {
17     /// Connects to the socket named by `path`.
18     ///
19     /// This may return a `WouldBlock` in which case the socket connection
20     /// cannot be completed immediately. Usually it means the backlog is full.
connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream>21     pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
22         sys::uds::stream::connect(path.as_ref()).map(UnixStream::from_std)
23     }
24 
25     /// Creates a new `UnixStream` from a standard `net::UnixStream`.
26     ///
27     /// This function is intended to be used to wrap a Unix stream from the
28     /// standard library in the Mio equivalent. The conversion assumes nothing
29     /// about the underlying stream; it is left up to the user to set it in
30     /// non-blocking mode.
31     ///
32     /// # Note
33     ///
34     /// The Unix stream here will not have `connect` called on it, so it
35     /// should already be connected via some other means (be it manually, or
36     /// the standard library).
from_std(stream: net::UnixStream) -> UnixStream37     pub fn from_std(stream: net::UnixStream) -> UnixStream {
38         UnixStream {
39             inner: IoSource::new(stream),
40         }
41     }
42 
43     /// Creates an unnamed pair of connected sockets.
44     ///
45     /// Returns two `UnixStream`s which are connected to each other.
pair() -> io::Result<(UnixStream, UnixStream)>46     pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
47         sys::uds::stream::pair().map(|(stream1, stream2)| {
48             (UnixStream::from_std(stream1), UnixStream::from_std(stream2))
49         })
50     }
51 
52     /// Returns the socket address of the local half of this connection.
local_addr(&self) -> io::Result<sys::SocketAddr>53     pub fn local_addr(&self) -> io::Result<sys::SocketAddr> {
54         sys::uds::stream::local_addr(&self.inner)
55     }
56 
57     /// Returns the socket address of the remote half of this connection.
peer_addr(&self) -> io::Result<sys::SocketAddr>58     pub fn peer_addr(&self) -> io::Result<sys::SocketAddr> {
59         sys::uds::stream::peer_addr(&self.inner)
60     }
61 
62     /// Returns the value of the `SO_ERROR` option.
take_error(&self) -> io::Result<Option<io::Error>>63     pub fn take_error(&self) -> io::Result<Option<io::Error>> {
64         self.inner.take_error()
65     }
66 
67     /// Shuts down the read, write, or both halves of this connection.
68     ///
69     /// This function will cause all pending and future I/O calls on the
70     /// specified portions to immediately return with an appropriate value
71     /// (see the documentation of `Shutdown`).
shutdown(&self, how: Shutdown) -> io::Result<()>72     pub fn shutdown(&self, how: Shutdown) -> io::Result<()> {
73         self.inner.shutdown(how)
74     }
75 
76     /// Execute an I/O operation ensuring that the socket receives more events
77     /// if it hits a [`WouldBlock`] error.
78     ///
79     /// # Notes
80     ///
81     /// This method is required to be called for **all** I/O operations to
82     /// ensure the user will receive events once the socket is ready again after
83     /// returning a [`WouldBlock`] error.
84     ///
85     /// [`WouldBlock`]: io::ErrorKind::WouldBlock
86     ///
87     /// # Examples
88     ///
89     /// ```
90     /// # use std::error::Error;
91     /// #
92     /// # fn main() -> Result<(), Box<dyn Error>> {
93     /// use std::io;
94     /// use std::os::unix::io::AsRawFd;
95     /// use mio::net::UnixStream;
96     ///
97     /// let (stream1, stream2) = UnixStream::pair()?;
98     ///
99     /// // Wait until the stream is writable...
100     ///
101     /// // Write to the stream using a direct libc call, of course the
102     /// // `io::Write` implementation would be easier to use.
103     /// let buf = b"hello";
104     /// let n = stream1.try_io(|| {
105     ///     let buf_ptr = &buf as *const _ as *const _;
106     ///     let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) };
107     ///     if res != -1 {
108     ///         Ok(res as usize)
109     ///     } else {
110     ///         // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure
111     ///         // should return `WouldBlock` error.
112     ///         Err(io::Error::last_os_error())
113     ///     }
114     /// })?;
115     /// eprintln!("write {} bytes", n);
116     ///
117     /// // Wait until the stream is readable...
118     ///
119     /// // Read from the stream using a direct libc call, of course the
120     /// // `io::Read` implementation would be easier to use.
121     /// let mut buf = [0; 512];
122     /// let n = stream2.try_io(|| {
123     ///     let buf_ptr = &mut buf as *mut _ as *mut _;
124     ///     let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) };
125     ///     if res != -1 {
126     ///         Ok(res as usize)
127     ///     } else {
128     ///         // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure
129     ///         // should return `WouldBlock` error.
130     ///         Err(io::Error::last_os_error())
131     ///     }
132     /// })?;
133     /// eprintln!("read {} bytes", n);
134     /// # Ok(())
135     /// # }
136     /// ```
try_io<F, T>(&self, f: F) -> io::Result<T> where F: FnOnce() -> io::Result<T>,137     pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
138     where
139         F: FnOnce() -> io::Result<T>,
140     {
141         self.inner.do_io(|_| f())
142     }
143 }
144 
145 impl Read for UnixStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>146     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
147         self.inner.do_io(|mut inner| inner.read(buf))
148     }
149 
read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>150     fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
151         self.inner.do_io(|mut inner| inner.read_vectored(bufs))
152     }
153 }
154 
155 impl<'a> Read for &'a UnixStream {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>156     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
157         self.inner.do_io(|mut inner| inner.read(buf))
158     }
159 
read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>160     fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
161         self.inner.do_io(|mut inner| inner.read_vectored(bufs))
162     }
163 }
164 
165 impl Write for UnixStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>166     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
167         self.inner.do_io(|mut inner| inner.write(buf))
168     }
169 
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>170     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
171         self.inner.do_io(|mut inner| inner.write_vectored(bufs))
172     }
173 
flush(&mut self) -> io::Result<()>174     fn flush(&mut self) -> io::Result<()> {
175         self.inner.do_io(|mut inner| inner.flush())
176     }
177 }
178 
179 impl<'a> Write for &'a UnixStream {
write(&mut self, buf: &[u8]) -> io::Result<usize>180     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
181         self.inner.do_io(|mut inner| inner.write(buf))
182     }
183 
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>184     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
185         self.inner.do_io(|mut inner| inner.write_vectored(bufs))
186     }
187 
flush(&mut self) -> io::Result<()>188     fn flush(&mut self) -> io::Result<()> {
189         self.inner.do_io(|mut inner| inner.flush())
190     }
191 }
192 
193 impl event::Source for UnixStream {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>194     fn register(
195         &mut self,
196         registry: &Registry,
197         token: Token,
198         interests: Interest,
199     ) -> io::Result<()> {
200         self.inner.register(registry, token, interests)
201     }
202 
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>203     fn reregister(
204         &mut self,
205         registry: &Registry,
206         token: Token,
207         interests: Interest,
208     ) -> io::Result<()> {
209         self.inner.reregister(registry, token, interests)
210     }
211 
deregister(&mut self, registry: &Registry) -> io::Result<()>212     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
213         self.inner.deregister(registry)
214     }
215 }
216 
217 impl fmt::Debug for UnixStream {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result218     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219         self.inner.fmt(f)
220     }
221 }
222 
223 impl IntoRawFd for UnixStream {
into_raw_fd(self) -> RawFd224     fn into_raw_fd(self) -> RawFd {
225         self.inner.into_inner().into_raw_fd()
226     }
227 }
228 
229 impl AsRawFd for UnixStream {
as_raw_fd(&self) -> RawFd230     fn as_raw_fd(&self) -> RawFd {
231         self.inner.as_raw_fd()
232     }
233 }
234 
235 impl FromRawFd for UnixStream {
236     /// Converts a `RawFd` to a `UnixStream`.
237     ///
238     /// # Notes
239     ///
240     /// The caller is responsible for ensuring that the socket is in
241     /// non-blocking mode.
from_raw_fd(fd: RawFd) -> UnixStream242     unsafe fn from_raw_fd(fd: RawFd) -> UnixStream {
243         UnixStream::from_std(FromRawFd::from_raw_fd(fd))
244     }
245 }
246