• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Unix pipe.
2 //!
3 //! See the [`new`] function for documentation.
4 
5 use std::fs::File;
6 use std::io::{self, IoSlice, IoSliceMut, Read, Write};
7 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
8 use std::process::{ChildStderr, ChildStdin, ChildStdout};
9 
10 use crate::io_source::IoSource;
11 use crate::{event, Interest, Registry, Token};
12 
13 /// Create a new non-blocking Unix pipe.
14 ///
15 /// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
16 /// inter-process or thread communication channel.
17 ///
18 /// This channel may be created before forking the process and then one end used
19 /// in each process, e.g. the parent process has the sending end to send command
20 /// to the child process.
21 ///
22 /// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
23 ///
24 /// # Events
25 ///
26 /// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
27 /// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
28 /// written to the `Sender` the `Receiver` will receive an [readable event].
29 ///
30 /// In addition to those events, events will also be generated if the other side
31 /// is dropped. To check if the `Sender` is dropped you'll need to check
32 /// [`is_read_closed`] on events for the `Receiver`, if it returns true the
33 /// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
34 /// returns true the `Receiver` was dropped. Also see the second example below.
35 ///
36 /// [`WRITABLE`]: Interest::WRITABLE
37 /// [writable events]: event::Event::is_writable
38 /// [`READABLE`]: Interest::READABLE
39 /// [readable event]: event::Event::is_readable
40 /// [`is_read_closed`]: event::Event::is_read_closed
41 /// [`is_write_closed`]: event::Event::is_write_closed
42 ///
43 /// # Deregistering
44 ///
45 /// Both `Sender` and `Receiver` will deregister themselves when dropped,
46 /// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
47 ///
48 /// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
49 ///
50 /// # Examples
51 ///
52 /// Simple example that writes data into the sending end and read it from the
53 /// receiving end.
54 ///
55 /// ```
56 /// use std::io::{self, Read, Write};
57 ///
58 /// use mio::{Poll, Events, Interest, Token};
59 /// use mio::unix::pipe;
60 ///
61 /// // Unique tokens for the two ends of the channel.
62 /// const PIPE_RECV: Token = Token(0);
63 /// const PIPE_SEND: Token = Token(1);
64 ///
65 /// # fn main() -> io::Result<()> {
66 /// // Create our `Poll` instance and the `Events` container.
67 /// let mut poll = Poll::new()?;
68 /// let mut events = Events::with_capacity(8);
69 ///
70 /// // Create a new pipe.
71 /// let (mut sender, mut receiver) = pipe::new()?;
72 ///
73 /// // Register both ends of the channel.
74 /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
75 /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
76 ///
77 /// const MSG: &[u8; 11] = b"Hello world";
78 ///
79 /// loop {
80 ///     poll.poll(&mut events, None)?;
81 ///
82 ///     for event in events.iter() {
83 ///         match event.token() {
84 ///             PIPE_SEND => sender.write(MSG)
85 ///                 .and_then(|n| if n != MSG.len() {
86 ///                         // We'll consider a short write an error in this
87 ///                         // example. NOTE: we can't use `write_all` with
88 ///                         // non-blocking I/O.
89 ///                         Err(io::ErrorKind::WriteZero.into())
90 ///                     } else {
91 ///                         Ok(())
92 ///                     })?,
93 ///             PIPE_RECV => {
94 ///                 let mut buf = [0; 11];
95 ///                 let n = receiver.read(&mut buf)?;
96 ///                 println!("received: {:?}", &buf[0..n]);
97 ///                 assert_eq!(n, MSG.len());
98 ///                 assert_eq!(&buf, &*MSG);
99 ///                 return Ok(());
100 ///             },
101 ///             _ => unreachable!(),
102 ///         }
103 ///     }
104 /// }
105 /// # }
106 /// ```
107 ///
108 /// Example that receives an event once the `Sender` is dropped.
109 ///
110 /// ```
111 /// # use std::io;
112 /// #
113 /// # use mio::{Poll, Events, Interest, Token};
114 /// # use mio::unix::pipe;
115 /// #
116 /// # const PIPE_RECV: Token = Token(0);
117 /// # const PIPE_SEND: Token = Token(1);
118 /// #
119 /// # fn main() -> io::Result<()> {
120 /// // Same setup as in the example above.
121 /// let mut poll = Poll::new()?;
122 /// let mut events = Events::with_capacity(8);
123 ///
124 /// let (mut sender, mut receiver) = pipe::new()?;
125 ///
126 /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
127 /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
128 ///
129 /// // Drop the sender.
130 /// drop(sender);
131 ///
132 /// poll.poll(&mut events, None)?;
133 ///
134 /// for event in events.iter() {
135 ///     match event.token() {
136 ///         PIPE_RECV if event.is_read_closed() => {
137 ///             // Detected that the sender was dropped.
138 ///             println!("Sender dropped!");
139 ///             return Ok(());
140 ///         },
141 ///         _ => unreachable!(),
142 ///     }
143 /// }
144 /// # unreachable!();
145 /// # }
146 /// ```
new() -> io::Result<(Sender, Receiver)>147 pub fn new() -> io::Result<(Sender, Receiver)> {
148     let mut fds: [RawFd; 2] = [-1, -1];
149 
150     #[cfg(any(
151         target_os = "android",
152         target_os = "dragonfly",
153         target_os = "freebsd",
154         target_os = "linux",
155         target_os = "netbsd",
156         target_os = "openbsd",
157         target_os = "illumos",
158         target_os = "redox",
159     ))]
160     unsafe {
161         if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
162             return Err(io::Error::last_os_error());
163         }
164     }
165 
166     #[cfg(any(target_os = "ios", target_os = "macos"))]
167     unsafe {
168         // For platforms that don't have `pipe2(2)` we need to manually set the
169         // correct flags on the file descriptor.
170         if libc::pipe(fds.as_mut_ptr()) != 0 {
171             return Err(io::Error::last_os_error());
172         }
173 
174         for fd in &fds {
175             if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
176                 || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
177             {
178                 let err = io::Error::last_os_error();
179                 // Don't leak file descriptors. Can't handle closing error though.
180                 let _ = libc::close(fds[0]);
181                 let _ = libc::close(fds[1]);
182                 return Err(err);
183             }
184         }
185     }
186 
187     #[cfg(not(any(
188         target_os = "android",
189         target_os = "dragonfly",
190         target_os = "freebsd",
191         target_os = "illumos",
192         target_os = "ios",
193         target_os = "linux",
194         target_os = "macos",
195         target_os = "netbsd",
196         target_os = "openbsd",
197         target_os = "redox",
198     )))]
199     compile_error!("unsupported target for `mio::unix::pipe`");
200 
201     // SAFETY: we just initialised the `fds` above.
202     let r = unsafe { Receiver::from_raw_fd(fds[0]) };
203     let w = unsafe { Sender::from_raw_fd(fds[1]) };
204 
205     Ok((w, r))
206 }
207 
208 /// Sending end of an Unix pipe.
209 ///
210 /// See [`new`] for documentation, including examples.
211 #[derive(Debug)]
212 pub struct Sender {
213     inner: IoSource<File>,
214 }
215 
216 impl Sender {
217     /// Set the `Sender` into or out of non-blocking mode.
set_nonblocking(&self, nonblocking: bool) -> io::Result<()>218     pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
219         set_nonblocking(self.inner.as_raw_fd(), nonblocking)
220     }
221 
222     /// Execute an I/O operation ensuring that the socket receives more events
223     /// if it hits a [`WouldBlock`] error.
224     ///
225     /// # Notes
226     ///
227     /// This method is required to be called for **all** I/O operations to
228     /// ensure the user will receive events once the socket is ready again after
229     /// returning a [`WouldBlock`] error.
230     ///
231     /// [`WouldBlock`]: io::ErrorKind::WouldBlock
232     ///
233     /// # Examples
234     ///
235     /// ```
236     /// # use std::error::Error;
237     /// #
238     /// # fn main() -> Result<(), Box<dyn Error>> {
239     /// use std::io;
240     /// use std::os::unix::io::AsRawFd;
241     /// use mio::unix::pipe;
242     ///
243     /// let (sender, receiver) = pipe::new()?;
244     ///
245     /// // Wait until the sender is writable...
246     ///
247     /// // Write to the sender using a direct libc call, of course the
248     /// // `io::Write` implementation would be easier to use.
249     /// let buf = b"hello";
250     /// let n = sender.try_io(|| {
251     ///     let buf_ptr = &buf as *const _ as *const _;
252     ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
253     ///     if res != -1 {
254     ///         Ok(res as usize)
255     ///     } else {
256     ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
257     ///         // should return `WouldBlock` error.
258     ///         Err(io::Error::last_os_error())
259     ///     }
260     /// })?;
261     /// eprintln!("write {} bytes", n);
262     ///
263     /// // Wait until the receiver is readable...
264     ///
265     /// // Read from the receiver using a direct libc call, of course the
266     /// // `io::Read` implementation would be easier to use.
267     /// let mut buf = [0; 512];
268     /// let n = receiver.try_io(|| {
269     ///     let buf_ptr = &mut buf as *mut _ as *mut _;
270     ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
271     ///     if res != -1 {
272     ///         Ok(res as usize)
273     ///     } else {
274     ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
275     ///         // should return `WouldBlock` error.
276     ///         Err(io::Error::last_os_error())
277     ///     }
278     /// })?;
279     /// eprintln!("read {} bytes", n);
280     /// # Ok(())
281     /// # }
282     /// ```
try_io<F, T>(&self, f: F) -> io::Result<T> where F: FnOnce() -> io::Result<T>,283     pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
284     where
285         F: FnOnce() -> io::Result<T>,
286     {
287         self.inner.do_io(|_| f())
288     }
289 }
290 
291 impl event::Source for Sender {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>292     fn register(
293         &mut self,
294         registry: &Registry,
295         token: Token,
296         interests: Interest,
297     ) -> io::Result<()> {
298         self.inner.register(registry, token, interests)
299     }
300 
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>301     fn reregister(
302         &mut self,
303         registry: &Registry,
304         token: Token,
305         interests: Interest,
306     ) -> io::Result<()> {
307         self.inner.reregister(registry, token, interests)
308     }
309 
deregister(&mut self, registry: &Registry) -> io::Result<()>310     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
311         self.inner.deregister(registry)
312     }
313 }
314 
315 impl Write for Sender {
write(&mut self, buf: &[u8]) -> io::Result<usize>316     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
317         self.inner.do_io(|mut sender| sender.write(buf))
318     }
319 
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>320     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
321         self.inner.do_io(|mut sender| sender.write_vectored(bufs))
322     }
323 
flush(&mut self) -> io::Result<()>324     fn flush(&mut self) -> io::Result<()> {
325         self.inner.do_io(|mut sender| sender.flush())
326     }
327 }
328 
329 impl Write for &Sender {
write(&mut self, buf: &[u8]) -> io::Result<usize>330     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
331         self.inner.do_io(|mut sender| sender.write(buf))
332     }
333 
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>334     fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
335         self.inner.do_io(|mut sender| sender.write_vectored(bufs))
336     }
337 
flush(&mut self) -> io::Result<()>338     fn flush(&mut self) -> io::Result<()> {
339         self.inner.do_io(|mut sender| sender.flush())
340     }
341 }
342 
343 /// # Notes
344 ///
345 /// The underlying pipe is **not** set to non-blocking.
346 impl From<ChildStdin> for Sender {
from(stdin: ChildStdin) -> Sender347     fn from(stdin: ChildStdin) -> Sender {
348         // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
349         unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
350     }
351 }
352 
353 impl FromRawFd for Sender {
from_raw_fd(fd: RawFd) -> Sender354     unsafe fn from_raw_fd(fd: RawFd) -> Sender {
355         Sender {
356             inner: IoSource::new(File::from_raw_fd(fd)),
357         }
358     }
359 }
360 
361 impl AsRawFd for Sender {
as_raw_fd(&self) -> RawFd362     fn as_raw_fd(&self) -> RawFd {
363         self.inner.as_raw_fd()
364     }
365 }
366 
367 impl IntoRawFd for Sender {
into_raw_fd(self) -> RawFd368     fn into_raw_fd(self) -> RawFd {
369         self.inner.into_inner().into_raw_fd()
370     }
371 }
372 
373 /// Receiving end of an Unix pipe.
374 ///
375 /// See [`new`] for documentation, including examples.
376 #[derive(Debug)]
377 pub struct Receiver {
378     inner: IoSource<File>,
379 }
380 
381 impl Receiver {
382     /// Set the `Receiver` into or out of non-blocking mode.
set_nonblocking(&self, nonblocking: bool) -> io::Result<()>383     pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
384         set_nonblocking(self.inner.as_raw_fd(), nonblocking)
385     }
386 
387     /// Execute an I/O operation ensuring that the socket receives more events
388     /// if it hits a [`WouldBlock`] error.
389     ///
390     /// # Notes
391     ///
392     /// This method is required to be called for **all** I/O operations to
393     /// ensure the user will receive events once the socket is ready again after
394     /// returning a [`WouldBlock`] error.
395     ///
396     /// [`WouldBlock`]: io::ErrorKind::WouldBlock
397     ///
398     /// # Examples
399     ///
400     /// ```
401     /// # use std::error::Error;
402     /// #
403     /// # fn main() -> Result<(), Box<dyn Error>> {
404     /// use std::io;
405     /// use std::os::unix::io::AsRawFd;
406     /// use mio::unix::pipe;
407     ///
408     /// let (sender, receiver) = pipe::new()?;
409     ///
410     /// // Wait until the sender is writable...
411     ///
412     /// // Write to the sender using a direct libc call, of course the
413     /// // `io::Write` implementation would be easier to use.
414     /// let buf = b"hello";
415     /// let n = sender.try_io(|| {
416     ///     let buf_ptr = &buf as *const _ as *const _;
417     ///     let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
418     ///     if res != -1 {
419     ///         Ok(res as usize)
420     ///     } else {
421     ///         // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
422     ///         // should return `WouldBlock` error.
423     ///         Err(io::Error::last_os_error())
424     ///     }
425     /// })?;
426     /// eprintln!("write {} bytes", n);
427     ///
428     /// // Wait until the receiver is readable...
429     ///
430     /// // Read from the receiver using a direct libc call, of course the
431     /// // `io::Read` implementation would be easier to use.
432     /// let mut buf = [0; 512];
433     /// let n = receiver.try_io(|| {
434     ///     let buf_ptr = &mut buf as *mut _ as *mut _;
435     ///     let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
436     ///     if res != -1 {
437     ///         Ok(res as usize)
438     ///     } else {
439     ///         // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
440     ///         // should return `WouldBlock` error.
441     ///         Err(io::Error::last_os_error())
442     ///     }
443     /// })?;
444     /// eprintln!("read {} bytes", n);
445     /// # Ok(())
446     /// # }
447     /// ```
try_io<F, T>(&self, f: F) -> io::Result<T> where F: FnOnce() -> io::Result<T>,448     pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
449     where
450         F: FnOnce() -> io::Result<T>,
451     {
452         self.inner.do_io(|_| f())
453     }
454 }
455 
456 impl event::Source for Receiver {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>457     fn register(
458         &mut self,
459         registry: &Registry,
460         token: Token,
461         interests: Interest,
462     ) -> io::Result<()> {
463         self.inner.register(registry, token, interests)
464     }
465 
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>466     fn reregister(
467         &mut self,
468         registry: &Registry,
469         token: Token,
470         interests: Interest,
471     ) -> io::Result<()> {
472         self.inner.reregister(registry, token, interests)
473     }
474 
deregister(&mut self, registry: &Registry) -> io::Result<()>475     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
476         self.inner.deregister(registry)
477     }
478 }
479 
480 impl Read for Receiver {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>481     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
482         self.inner.do_io(|mut sender| sender.read(buf))
483     }
484 
read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>485     fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
486         self.inner.do_io(|mut sender| sender.read_vectored(bufs))
487     }
488 }
489 
490 impl Read for &Receiver {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>491     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
492         self.inner.do_io(|mut sender| sender.read(buf))
493     }
494 
read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>495     fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
496         self.inner.do_io(|mut sender| sender.read_vectored(bufs))
497     }
498 }
499 
500 /// # Notes
501 ///
502 /// The underlying pipe is **not** set to non-blocking.
503 impl From<ChildStdout> for Receiver {
from(stdout: ChildStdout) -> Receiver504     fn from(stdout: ChildStdout) -> Receiver {
505         // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
506         unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
507     }
508 }
509 
510 /// # Notes
511 ///
512 /// The underlying pipe is **not** set to non-blocking.
513 impl From<ChildStderr> for Receiver {
from(stderr: ChildStderr) -> Receiver514     fn from(stderr: ChildStderr) -> Receiver {
515         // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
516         unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
517     }
518 }
519 
520 impl FromRawFd for Receiver {
from_raw_fd(fd: RawFd) -> Receiver521     unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
522         Receiver {
523             inner: IoSource::new(File::from_raw_fd(fd)),
524         }
525     }
526 }
527 
528 impl AsRawFd for Receiver {
as_raw_fd(&self) -> RawFd529     fn as_raw_fd(&self) -> RawFd {
530         self.inner.as_raw_fd()
531     }
532 }
533 
534 impl IntoRawFd for Receiver {
into_raw_fd(self) -> RawFd535     fn into_raw_fd(self) -> RawFd {
536         self.inner.into_inner().into_raw_fd()
537     }
538 }
539 
540 #[cfg(not(target_os = "illumos"))]
set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()>541 fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
542     let value = nonblocking as libc::c_int;
543     if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
544         Err(io::Error::last_os_error())
545     } else {
546         Ok(())
547     }
548 }
549 
550 #[cfg(target_os = "illumos")]
set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()>551 fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
552     let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
553     if flags < 0 {
554         return Err(io::Error::last_os_error());
555     }
556 
557     let nflags = if nonblocking {
558         flags | libc::O_NONBLOCK
559     } else {
560         flags & !libc::O_NONBLOCK
561     };
562 
563     if flags != nflags {
564         if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
565             return Err(io::Error::last_os_error());
566         }
567     }
568 
569     Ok(())
570 }
571