//! Unix pipe. //! //! See the [`new`] function for documentation. use std::fs::File; use std::io::{self, IoSlice, IoSliceMut, Read, Write}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::process::{ChildStderr, ChildStdin, ChildStdout}; use crate::io_source::IoSource; use crate::{event, Interest, Registry, Token}; /// Create a new non-blocking Unix pipe. /// /// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as /// inter-process or thread communication channel. /// /// This channel may be created before forking the process and then one end used /// in each process, e.g. the parent process has the sending end to send command /// to the child process. /// /// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html /// /// # Events /// /// The [`Sender`] can be registered with [`WRITABLE`] interest to receive /// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is /// written to the `Sender` the `Receiver` will receive an [readable event]. /// /// In addition to those events, events will also be generated if the other side /// is dropped. To check if the `Sender` is dropped you'll need to check /// [`is_read_closed`] on events for the `Receiver`, if it returns true the /// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it /// returns true the `Receiver` was dropped. Also see the second example below. /// /// [`WRITABLE`]: Interest::WRITABLE /// [writable events]: event::Event::is_writable /// [`READABLE`]: Interest::READABLE /// [readable event]: event::Event::is_readable /// [`is_read_closed`]: event::Event::is_read_closed /// [`is_write_closed`]: event::Event::is_write_closed /// /// # Deregistering /// /// Both `Sender` and `Receiver` will deregister themselves when dropped, /// **iff** the file descriptors are not duplicated (via [`dup(2)`]). /// /// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html /// /// # Examples /// /// Simple example that writes data into the sending end and read it from the /// receiving end. /// /// ``` /// use std::io::{self, Read, Write}; /// /// use mio::{Poll, Events, Interest, Token}; /// use mio::unix::pipe; /// /// // Unique tokens for the two ends of the channel. /// const PIPE_RECV: Token = Token(0); /// const PIPE_SEND: Token = Token(1); /// /// # fn main() -> io::Result<()> { /// // Create our `Poll` instance and the `Events` container. /// let mut poll = Poll::new()?; /// let mut events = Events::with_capacity(8); /// /// // Create a new pipe. /// let (mut sender, mut receiver) = pipe::new()?; /// /// // Register both ends of the channel. /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; /// /// const MSG: &[u8; 11] = b"Hello world"; /// /// loop { /// poll.poll(&mut events, None)?; /// /// for event in events.iter() { /// match event.token() { /// PIPE_SEND => sender.write(MSG) /// .and_then(|n| if n != MSG.len() { /// // We'll consider a short write an error in this /// // example. NOTE: we can't use `write_all` with /// // non-blocking I/O. /// Err(io::ErrorKind::WriteZero.into()) /// } else { /// Ok(()) /// })?, /// PIPE_RECV => { /// let mut buf = [0; 11]; /// let n = receiver.read(&mut buf)?; /// println!("received: {:?}", &buf[0..n]); /// assert_eq!(n, MSG.len()); /// assert_eq!(&buf, &*MSG); /// return Ok(()); /// }, /// _ => unreachable!(), /// } /// } /// } /// # } /// ``` /// /// Example that receives an event once the `Sender` is dropped. /// /// ``` /// # use std::io; /// # /// # use mio::{Poll, Events, Interest, Token}; /// # use mio::unix::pipe; /// # /// # const PIPE_RECV: Token = Token(0); /// # const PIPE_SEND: Token = Token(1); /// # /// # fn main() -> io::Result<()> { /// // Same setup as in the example above. /// let mut poll = Poll::new()?; /// let mut events = Events::with_capacity(8); /// /// let (mut sender, mut receiver) = pipe::new()?; /// /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?; /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?; /// /// // Drop the sender. /// drop(sender); /// /// poll.poll(&mut events, None)?; /// /// for event in events.iter() { /// match event.token() { /// PIPE_RECV if event.is_read_closed() => { /// // Detected that the sender was dropped. /// println!("Sender dropped!"); /// return Ok(()); /// }, /// _ => unreachable!(), /// } /// } /// # unreachable!(); /// # } /// ``` pub fn new() -> io::Result<(Sender, Receiver)> { let mut fds: [RawFd; 2] = [-1, -1]; #[cfg(any( target_os = "android", target_os = "dragonfly", target_os = "freebsd", target_os = "linux", target_os = "netbsd", target_os = "openbsd", target_os = "illumos", ))] unsafe { if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 { return Err(io::Error::last_os_error()); } } #[cfg(any(target_os = "ios", target_os = "macos", target_os = "solaris"))] unsafe { // For platforms that don't have `pipe2(2)` we need to manually set the // correct flags on the file descriptor. if libc::pipe(fds.as_mut_ptr()) != 0 { return Err(io::Error::last_os_error()); } for fd in &fds { if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0 || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0 { let err = io::Error::last_os_error(); // Don't leak file descriptors. Can't handle error though. let _ = libc::close(fds[0]); let _ = libc::close(fds[1]); return Err(err); } } } #[cfg(not(any( target_os = "android", target_os = "dragonfly", target_os = "freebsd", target_os = "linux", target_os = "netbsd", target_os = "openbsd", target_os = "ios", target_os = "macos", target_os = "solaris", target_os = "illumos", )))] compile_error!("unsupported target for `mio::unix::pipe`"); // Safety: we just initialised the `fds` above. let r = unsafe { Receiver::from_raw_fd(fds[0]) }; let w = unsafe { Sender::from_raw_fd(fds[1]) }; Ok((w, r)) } /// Sending end of an Unix pipe. /// /// See [`new`] for documentation, including examples. #[derive(Debug)] pub struct Sender { inner: IoSource, } impl Sender { /// Set the `Sender` into or out of non-blocking mode. pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { set_nonblocking(self.inner.as_raw_fd(), nonblocking) } } impl event::Source for Sender { fn register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()> { self.inner.register(registry, token, interests) } fn reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()> { self.inner.reregister(registry, token, interests) } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { self.inner.deregister(registry) } } impl Write for Sender { fn write(&mut self, buf: &[u8]) -> io::Result { self.inner.do_io(|sender| (&*sender).write(buf)) } fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { self.inner.do_io(|sender| (&*sender).write_vectored(bufs)) } fn flush(&mut self) -> io::Result<()> { self.inner.do_io(|sender| (&*sender).flush()) } } impl Write for &Sender { fn write(&mut self, buf: &[u8]) -> io::Result { self.inner.do_io(|sender| (&*sender).write(buf)) } fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result { self.inner.do_io(|sender| (&*sender).write_vectored(bufs)) } fn flush(&mut self) -> io::Result<()> { self.inner.do_io(|sender| (&*sender).flush()) } } /// # Notes /// /// The underlying pipe is **not** set to non-blocking. impl From for Sender { fn from(stdin: ChildStdin) -> Sender { // Safety: `ChildStdin` is guaranteed to be a valid file descriptor. unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) } } } impl FromRawFd for Sender { unsafe fn from_raw_fd(fd: RawFd) -> Sender { Sender { inner: IoSource::new(File::from_raw_fd(fd)), } } } impl AsRawFd for Sender { fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() } } impl IntoRawFd for Sender { fn into_raw_fd(self) -> RawFd { self.inner.into_inner().into_raw_fd() } } /// Receiving end of an Unix pipe. /// /// See [`new`] for documentation, including examples. #[derive(Debug)] pub struct Receiver { inner: IoSource, } impl Receiver { /// Set the `Receiver` into or out of non-blocking mode. pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { set_nonblocking(self.inner.as_raw_fd(), nonblocking) } } impl event::Source for Receiver { fn register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()> { self.inner.register(registry, token, interests) } fn reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()> { self.inner.reregister(registry, token, interests) } fn deregister(&mut self, registry: &Registry) -> io::Result<()> { self.inner.deregister(registry) } } impl Read for Receiver { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.do_io(|sender| (&*sender).read(buf)) } fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result { self.inner.do_io(|sender| (&*sender).read_vectored(bufs)) } } impl Read for &Receiver { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.inner.do_io(|sender| (&*sender).read(buf)) } fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result { self.inner.do_io(|sender| (&*sender).read_vectored(bufs)) } } /// # Notes /// /// The underlying pipe is **not** set to non-blocking. impl From for Receiver { fn from(stdout: ChildStdout) -> Receiver { // Safety: `ChildStdout` is guaranteed to be a valid file descriptor. unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) } } } /// # Notes /// /// The underlying pipe is **not** set to non-blocking. impl From for Receiver { fn from(stderr: ChildStderr) -> Receiver { // Safety: `ChildStderr` is guaranteed to be a valid file descriptor. unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) } } } impl FromRawFd for Receiver { unsafe fn from_raw_fd(fd: RawFd) -> Receiver { Receiver { inner: IoSource::new(File::from_raw_fd(fd)), } } } impl AsRawFd for Receiver { fn as_raw_fd(&self) -> RawFd { self.inner.as_raw_fd() } } impl IntoRawFd for Receiver { fn into_raw_fd(self) -> RawFd { self.inner.into_inner().into_raw_fd() } } #[cfg(not(target_os = "illumos"))] fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { let value = nonblocking as libc::c_int; if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 { Err(io::Error::last_os_error()) } else { Ok(()) } } #[cfg(target_os = "illumos")] fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> { let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; if flags < 0 { return Err(io::Error::last_os_error()); } let nflags = if nonblocking { flags | libc::O_NONBLOCK } else { flags & !libc::O_NONBLOCK }; if flags != nflags { if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 { return Err(io::Error::last_os_error()); } } Ok(()) }