1 use crate::io_source::IoSource; 2 use crate::{event, sys, Interest, Registry, Token}; 3 4 use std::fmt; 5 use std::io::{self, IoSlice, IoSliceMut, Read, Write}; 6 use std::net::Shutdown; 7 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; 8 use std::os::unix::net; 9 use std::path::Path; 10 11 /// A non-blocking Unix stream socket. 12 pub struct UnixStream { 13 inner: IoSource<net::UnixStream>, 14 } 15 16 impl UnixStream { 17 /// Connects to the socket named by `path`. 18 /// 19 /// This may return a `WouldBlock` in which case the socket connection 20 /// cannot be completed immediately. Usually it means the backlog is full. connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream>21 pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> { 22 sys::uds::stream::connect(path.as_ref()).map(UnixStream::from_std) 23 } 24 25 /// Creates a new `UnixStream` from a standard `net::UnixStream`. 26 /// 27 /// This function is intended to be used to wrap a Unix stream from the 28 /// standard library in the Mio equivalent. The conversion assumes nothing 29 /// about the underlying stream; it is left up to the user to set it in 30 /// non-blocking mode. 31 /// 32 /// # Note 33 /// 34 /// The Unix stream here will not have `connect` called on it, so it 35 /// should already be connected via some other means (be it manually, or 36 /// the standard library). from_std(stream: net::UnixStream) -> UnixStream37 pub fn from_std(stream: net::UnixStream) -> UnixStream { 38 UnixStream { 39 inner: IoSource::new(stream), 40 } 41 } 42 43 /// Creates an unnamed pair of connected sockets. 44 /// 45 /// Returns two `UnixStream`s which are connected to each other. pair() -> io::Result<(UnixStream, UnixStream)>46 pub fn pair() -> io::Result<(UnixStream, UnixStream)> { 47 sys::uds::stream::pair().map(|(stream1, stream2)| { 48 (UnixStream::from_std(stream1), UnixStream::from_std(stream2)) 49 }) 50 } 51 52 /// Returns the socket address of the local half of this connection. local_addr(&self) -> io::Result<sys::SocketAddr>53 pub fn local_addr(&self) -> io::Result<sys::SocketAddr> { 54 sys::uds::stream::local_addr(&self.inner) 55 } 56 57 /// Returns the socket address of the remote half of this connection. peer_addr(&self) -> io::Result<sys::SocketAddr>58 pub fn peer_addr(&self) -> io::Result<sys::SocketAddr> { 59 sys::uds::stream::peer_addr(&self.inner) 60 } 61 62 /// Returns the value of the `SO_ERROR` option. take_error(&self) -> io::Result<Option<io::Error>>63 pub fn take_error(&self) -> io::Result<Option<io::Error>> { 64 self.inner.take_error() 65 } 66 67 /// Shuts down the read, write, or both halves of this connection. 68 /// 69 /// This function will cause all pending and future I/O calls on the 70 /// specified portions to immediately return with an appropriate value 71 /// (see the documentation of `Shutdown`). shutdown(&self, how: Shutdown) -> io::Result<()>72 pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { 73 self.inner.shutdown(how) 74 } 75 76 /// Execute an I/O operation ensuring that the socket receives more events 77 /// if it hits a [`WouldBlock`] error. 78 /// 79 /// # Notes 80 /// 81 /// This method is required to be called for **all** I/O operations to 82 /// ensure the user will receive events once the socket is ready again after 83 /// returning a [`WouldBlock`] error. 84 /// 85 /// [`WouldBlock`]: io::ErrorKind::WouldBlock 86 /// 87 /// # Examples 88 /// 89 /// ``` 90 /// # use std::error::Error; 91 /// # 92 /// # fn main() -> Result<(), Box<dyn Error>> { 93 /// use std::io; 94 /// use std::os::unix::io::AsRawFd; 95 /// use mio::net::UnixStream; 96 /// 97 /// let (stream1, stream2) = UnixStream::pair()?; 98 /// 99 /// // Wait until the stream is writable... 100 /// 101 /// // Write to the stream using a direct libc call, of course the 102 /// // `io::Write` implementation would be easier to use. 103 /// let buf = b"hello"; 104 /// let n = stream1.try_io(|| { 105 /// let buf_ptr = &buf as *const _ as *const _; 106 /// let res = unsafe { libc::send(stream1.as_raw_fd(), buf_ptr, buf.len(), 0) }; 107 /// if res != -1 { 108 /// Ok(res as usize) 109 /// } else { 110 /// // If EAGAIN or EWOULDBLOCK is set by libc::send, the closure 111 /// // should return `WouldBlock` error. 112 /// Err(io::Error::last_os_error()) 113 /// } 114 /// })?; 115 /// eprintln!("write {} bytes", n); 116 /// 117 /// // Wait until the stream is readable... 118 /// 119 /// // Read from the stream using a direct libc call, of course the 120 /// // `io::Read` implementation would be easier to use. 121 /// let mut buf = [0; 512]; 122 /// let n = stream2.try_io(|| { 123 /// let buf_ptr = &mut buf as *mut _ as *mut _; 124 /// let res = unsafe { libc::recv(stream2.as_raw_fd(), buf_ptr, buf.len(), 0) }; 125 /// if res != -1 { 126 /// Ok(res as usize) 127 /// } else { 128 /// // If EAGAIN or EWOULDBLOCK is set by libc::recv, the closure 129 /// // should return `WouldBlock` error. 130 /// Err(io::Error::last_os_error()) 131 /// } 132 /// })?; 133 /// eprintln!("read {} bytes", n); 134 /// # Ok(()) 135 /// # } 136 /// ``` try_io<F, T>(&self, f: F) -> io::Result<T> where F: FnOnce() -> io::Result<T>,137 pub fn try_io<F, T>(&self, f: F) -> io::Result<T> 138 where 139 F: FnOnce() -> io::Result<T>, 140 { 141 self.inner.do_io(|_| f()) 142 } 143 } 144 145 impl Read for UnixStream { read(&mut self, buf: &mut [u8]) -> io::Result<usize>146 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 147 self.inner.do_io(|mut inner| inner.read(buf)) 148 } 149 read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>150 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { 151 self.inner.do_io(|mut inner| inner.read_vectored(bufs)) 152 } 153 } 154 155 impl<'a> Read for &'a UnixStream { read(&mut self, buf: &mut [u8]) -> io::Result<usize>156 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { 157 self.inner.do_io(|mut inner| inner.read(buf)) 158 } 159 read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>160 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> { 161 self.inner.do_io(|mut inner| inner.read_vectored(bufs)) 162 } 163 } 164 165 impl Write for UnixStream { write(&mut self, buf: &[u8]) -> io::Result<usize>166 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 167 self.inner.do_io(|mut inner| inner.write(buf)) 168 } 169 write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>170 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { 171 self.inner.do_io(|mut inner| inner.write_vectored(bufs)) 172 } 173 flush(&mut self) -> io::Result<()>174 fn flush(&mut self) -> io::Result<()> { 175 self.inner.do_io(|mut inner| inner.flush()) 176 } 177 } 178 179 impl<'a> Write for &'a UnixStream { write(&mut self, buf: &[u8]) -> io::Result<usize>180 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 181 self.inner.do_io(|mut inner| inner.write(buf)) 182 } 183 write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>184 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> { 185 self.inner.do_io(|mut inner| inner.write_vectored(bufs)) 186 } 187 flush(&mut self) -> io::Result<()>188 fn flush(&mut self) -> io::Result<()> { 189 self.inner.do_io(|mut inner| inner.flush()) 190 } 191 } 192 193 impl event::Source for UnixStream { register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>194 fn register( 195 &mut self, 196 registry: &Registry, 197 token: Token, 198 interests: Interest, 199 ) -> io::Result<()> { 200 self.inner.register(registry, token, interests) 201 } 202 reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>203 fn reregister( 204 &mut self, 205 registry: &Registry, 206 token: Token, 207 interests: Interest, 208 ) -> io::Result<()> { 209 self.inner.reregister(registry, token, interests) 210 } 211 deregister(&mut self, registry: &Registry) -> io::Result<()>212 fn deregister(&mut self, registry: &Registry) -> io::Result<()> { 213 self.inner.deregister(registry) 214 } 215 } 216 217 impl fmt::Debug for UnixStream { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 219 self.inner.fmt(f) 220 } 221 } 222 223 impl IntoRawFd for UnixStream { into_raw_fd(self) -> RawFd224 fn into_raw_fd(self) -> RawFd { 225 self.inner.into_inner().into_raw_fd() 226 } 227 } 228 229 impl AsRawFd for UnixStream { as_raw_fd(&self) -> RawFd230 fn as_raw_fd(&self) -> RawFd { 231 self.inner.as_raw_fd() 232 } 233 } 234 235 impl FromRawFd for UnixStream { 236 /// Converts a `RawFd` to a `UnixStream`. 237 /// 238 /// # Notes 239 /// 240 /// The caller is responsible for ensuring that the socket is in 241 /// non-blocking mode. from_raw_fd(fd: RawFd) -> UnixStream242 unsafe fn from_raw_fd(fd: RawFd) -> UnixStream { 243 UnixStream::from_std(FromRawFd::from_raw_fd(fd)) 244 } 245 } 246