1 //! Unix pipe.
2 //!
3 //! See the [`new`] function for documentation.
4
5 use std::fs::File;
6 use std::io::{self, IoSlice, IoSliceMut, Read, Write};
7 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
8 use std::process::{ChildStderr, ChildStdin, ChildStdout};
9
10 use crate::io_source::IoSource;
11 use crate::{event, Interest, Registry, Token};
12
13 /// Create a new non-blocking Unix pipe.
14 ///
15 /// This is a wrapper around Unix's [`pipe(2)`] system call and can be used as
16 /// inter-process or thread communication channel.
17 ///
18 /// This channel may be created before forking the process and then one end used
19 /// in each process, e.g. the parent process has the sending end to send command
20 /// to the child process.
21 ///
22 /// [`pipe(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/pipe.html
23 ///
24 /// # Events
25 ///
26 /// The [`Sender`] can be registered with [`WRITABLE`] interest to receive
27 /// [writable events], the [`Receiver`] with [`READABLE`] interest. Once data is
28 /// written to the `Sender` the `Receiver` will receive an [readable event].
29 ///
30 /// In addition to those events, events will also be generated if the other side
31 /// is dropped. To check if the `Sender` is dropped you'll need to check
32 /// [`is_read_closed`] on events for the `Receiver`, if it returns true the
33 /// `Sender` is dropped. On the `Sender` end check [`is_write_closed`], if it
34 /// returns true the `Receiver` was dropped. Also see the second example below.
35 ///
36 /// [`WRITABLE`]: Interest::WRITABLE
37 /// [writable events]: event::Event::is_writable
38 /// [`READABLE`]: Interest::READABLE
39 /// [readable event]: event::Event::is_readable
40 /// [`is_read_closed`]: event::Event::is_read_closed
41 /// [`is_write_closed`]: event::Event::is_write_closed
42 ///
43 /// # Deregistering
44 ///
45 /// Both `Sender` and `Receiver` will deregister themselves when dropped,
46 /// **iff** the file descriptors are not duplicated (via [`dup(2)`]).
47 ///
48 /// [`dup(2)`]: https://pubs.opengroup.org/onlinepubs/9699919799/functions/dup.html
49 ///
50 /// # Examples
51 ///
52 /// Simple example that writes data into the sending end and read it from the
53 /// receiving end.
54 ///
55 /// ```
56 /// use std::io::{self, Read, Write};
57 ///
58 /// use mio::{Poll, Events, Interest, Token};
59 /// use mio::unix::pipe;
60 ///
61 /// // Unique tokens for the two ends of the channel.
62 /// const PIPE_RECV: Token = Token(0);
63 /// const PIPE_SEND: Token = Token(1);
64 ///
65 /// # fn main() -> io::Result<()> {
66 /// // Create our `Poll` instance and the `Events` container.
67 /// let mut poll = Poll::new()?;
68 /// let mut events = Events::with_capacity(8);
69 ///
70 /// // Create a new pipe.
71 /// let (mut sender, mut receiver) = pipe::new()?;
72 ///
73 /// // Register both ends of the channel.
74 /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
75 /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
76 ///
77 /// const MSG: &[u8; 11] = b"Hello world";
78 ///
79 /// loop {
80 /// poll.poll(&mut events, None)?;
81 ///
82 /// for event in events.iter() {
83 /// match event.token() {
84 /// PIPE_SEND => sender.write(MSG)
85 /// .and_then(|n| if n != MSG.len() {
86 /// // We'll consider a short write an error in this
87 /// // example. NOTE: we can't use `write_all` with
88 /// // non-blocking I/O.
89 /// Err(io::ErrorKind::WriteZero.into())
90 /// } else {
91 /// Ok(())
92 /// })?,
93 /// PIPE_RECV => {
94 /// let mut buf = [0; 11];
95 /// let n = receiver.read(&mut buf)?;
96 /// println!("received: {:?}", &buf[0..n]);
97 /// assert_eq!(n, MSG.len());
98 /// assert_eq!(&buf, &*MSG);
99 /// return Ok(());
100 /// },
101 /// _ => unreachable!(),
102 /// }
103 /// }
104 /// }
105 /// # }
106 /// ```
107 ///
108 /// Example that receives an event once the `Sender` is dropped.
109 ///
110 /// ```
111 /// # use std::io;
112 /// #
113 /// # use mio::{Poll, Events, Interest, Token};
114 /// # use mio::unix::pipe;
115 /// #
116 /// # const PIPE_RECV: Token = Token(0);
117 /// # const PIPE_SEND: Token = Token(1);
118 /// #
119 /// # fn main() -> io::Result<()> {
120 /// // Same setup as in the example above.
121 /// let mut poll = Poll::new()?;
122 /// let mut events = Events::with_capacity(8);
123 ///
124 /// let (mut sender, mut receiver) = pipe::new()?;
125 ///
126 /// poll.registry().register(&mut receiver, PIPE_RECV, Interest::READABLE)?;
127 /// poll.registry().register(&mut sender, PIPE_SEND, Interest::WRITABLE)?;
128 ///
129 /// // Drop the sender.
130 /// drop(sender);
131 ///
132 /// poll.poll(&mut events, None)?;
133 ///
134 /// for event in events.iter() {
135 /// match event.token() {
136 /// PIPE_RECV if event.is_read_closed() => {
137 /// // Detected that the sender was dropped.
138 /// println!("Sender dropped!");
139 /// return Ok(());
140 /// },
141 /// _ => unreachable!(),
142 /// }
143 /// }
144 /// # unreachable!();
145 /// # }
146 /// ```
new() -> io::Result<(Sender, Receiver)>147 pub fn new() -> io::Result<(Sender, Receiver)> {
148 let mut fds: [RawFd; 2] = [-1, -1];
149
150 #[cfg(any(
151 target_os = "android",
152 target_os = "dragonfly",
153 target_os = "freebsd",
154 target_os = "linux",
155 target_os = "netbsd",
156 target_os = "openbsd",
157 target_os = "illumos",
158 target_os = "redox",
159 ))]
160 unsafe {
161 if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
162 return Err(io::Error::last_os_error());
163 }
164 }
165
166 #[cfg(any(target_os = "ios", target_os = "macos"))]
167 unsafe {
168 // For platforms that don't have `pipe2(2)` we need to manually set the
169 // correct flags on the file descriptor.
170 if libc::pipe(fds.as_mut_ptr()) != 0 {
171 return Err(io::Error::last_os_error());
172 }
173
174 for fd in &fds {
175 if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
176 || libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
177 {
178 let err = io::Error::last_os_error();
179 // Don't leak file descriptors. Can't handle closing error though.
180 let _ = libc::close(fds[0]);
181 let _ = libc::close(fds[1]);
182 return Err(err);
183 }
184 }
185 }
186
187 #[cfg(not(any(
188 target_os = "android",
189 target_os = "dragonfly",
190 target_os = "freebsd",
191 target_os = "illumos",
192 target_os = "ios",
193 target_os = "linux",
194 target_os = "macos",
195 target_os = "netbsd",
196 target_os = "openbsd",
197 target_os = "redox",
198 )))]
199 compile_error!("unsupported target for `mio::unix::pipe`");
200
201 // SAFETY: we just initialised the `fds` above.
202 let r = unsafe { Receiver::from_raw_fd(fds[0]) };
203 let w = unsafe { Sender::from_raw_fd(fds[1]) };
204
205 Ok((w, r))
206 }
207
208 /// Sending end of an Unix pipe.
209 ///
210 /// See [`new`] for documentation, including examples.
211 #[derive(Debug)]
212 pub struct Sender {
213 inner: IoSource<File>,
214 }
215
216 impl Sender {
217 /// Set the `Sender` into or out of non-blocking mode.
set_nonblocking(&self, nonblocking: bool) -> io::Result<()>218 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
219 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
220 }
221
222 /// Execute an I/O operation ensuring that the socket receives more events
223 /// if it hits a [`WouldBlock`] error.
224 ///
225 /// # Notes
226 ///
227 /// This method is required to be called for **all** I/O operations to
228 /// ensure the user will receive events once the socket is ready again after
229 /// returning a [`WouldBlock`] error.
230 ///
231 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
232 ///
233 /// # Examples
234 ///
235 /// ```
236 /// # use std::error::Error;
237 /// #
238 /// # fn main() -> Result<(), Box<dyn Error>> {
239 /// use std::io;
240 /// use std::os::unix::io::AsRawFd;
241 /// use mio::unix::pipe;
242 ///
243 /// let (sender, receiver) = pipe::new()?;
244 ///
245 /// // Wait until the sender is writable...
246 ///
247 /// // Write to the sender using a direct libc call, of course the
248 /// // `io::Write` implementation would be easier to use.
249 /// let buf = b"hello";
250 /// let n = sender.try_io(|| {
251 /// let buf_ptr = &buf as *const _ as *const _;
252 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
253 /// if res != -1 {
254 /// Ok(res as usize)
255 /// } else {
256 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
257 /// // should return `WouldBlock` error.
258 /// Err(io::Error::last_os_error())
259 /// }
260 /// })?;
261 /// eprintln!("write {} bytes", n);
262 ///
263 /// // Wait until the receiver is readable...
264 ///
265 /// // Read from the receiver using a direct libc call, of course the
266 /// // `io::Read` implementation would be easier to use.
267 /// let mut buf = [0; 512];
268 /// let n = receiver.try_io(|| {
269 /// let buf_ptr = &mut buf as *mut _ as *mut _;
270 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
271 /// if res != -1 {
272 /// Ok(res as usize)
273 /// } else {
274 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
275 /// // should return `WouldBlock` error.
276 /// Err(io::Error::last_os_error())
277 /// }
278 /// })?;
279 /// eprintln!("read {} bytes", n);
280 /// # Ok(())
281 /// # }
282 /// ```
try_io<F, T>(&self, f: F) -> io::Result<T> where F: FnOnce() -> io::Result<T>,283 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
284 where
285 F: FnOnce() -> io::Result<T>,
286 {
287 self.inner.do_io(|_| f())
288 }
289 }
290
291 impl event::Source for Sender {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>292 fn register(
293 &mut self,
294 registry: &Registry,
295 token: Token,
296 interests: Interest,
297 ) -> io::Result<()> {
298 self.inner.register(registry, token, interests)
299 }
300
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>301 fn reregister(
302 &mut self,
303 registry: &Registry,
304 token: Token,
305 interests: Interest,
306 ) -> io::Result<()> {
307 self.inner.reregister(registry, token, interests)
308 }
309
deregister(&mut self, registry: &Registry) -> io::Result<()>310 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
311 self.inner.deregister(registry)
312 }
313 }
314
315 impl Write for Sender {
write(&mut self, buf: &[u8]) -> io::Result<usize>316 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
317 self.inner.do_io(|mut sender| sender.write(buf))
318 }
319
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>320 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
321 self.inner.do_io(|mut sender| sender.write_vectored(bufs))
322 }
323
flush(&mut self) -> io::Result<()>324 fn flush(&mut self) -> io::Result<()> {
325 self.inner.do_io(|mut sender| sender.flush())
326 }
327 }
328
329 impl Write for &Sender {
write(&mut self, buf: &[u8]) -> io::Result<usize>330 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
331 self.inner.do_io(|mut sender| sender.write(buf))
332 }
333
write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize>334 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
335 self.inner.do_io(|mut sender| sender.write_vectored(bufs))
336 }
337
flush(&mut self) -> io::Result<()>338 fn flush(&mut self) -> io::Result<()> {
339 self.inner.do_io(|mut sender| sender.flush())
340 }
341 }
342
343 /// # Notes
344 ///
345 /// The underlying pipe is **not** set to non-blocking.
346 impl From<ChildStdin> for Sender {
from(stdin: ChildStdin) -> Sender347 fn from(stdin: ChildStdin) -> Sender {
348 // Safety: `ChildStdin` is guaranteed to be a valid file descriptor.
349 unsafe { Sender::from_raw_fd(stdin.into_raw_fd()) }
350 }
351 }
352
353 impl FromRawFd for Sender {
from_raw_fd(fd: RawFd) -> Sender354 unsafe fn from_raw_fd(fd: RawFd) -> Sender {
355 Sender {
356 inner: IoSource::new(File::from_raw_fd(fd)),
357 }
358 }
359 }
360
361 impl AsRawFd for Sender {
as_raw_fd(&self) -> RawFd362 fn as_raw_fd(&self) -> RawFd {
363 self.inner.as_raw_fd()
364 }
365 }
366
367 impl IntoRawFd for Sender {
into_raw_fd(self) -> RawFd368 fn into_raw_fd(self) -> RawFd {
369 self.inner.into_inner().into_raw_fd()
370 }
371 }
372
373 /// Receiving end of an Unix pipe.
374 ///
375 /// See [`new`] for documentation, including examples.
376 #[derive(Debug)]
377 pub struct Receiver {
378 inner: IoSource<File>,
379 }
380
381 impl Receiver {
382 /// Set the `Receiver` into or out of non-blocking mode.
set_nonblocking(&self, nonblocking: bool) -> io::Result<()>383 pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> {
384 set_nonblocking(self.inner.as_raw_fd(), nonblocking)
385 }
386
387 /// Execute an I/O operation ensuring that the socket receives more events
388 /// if it hits a [`WouldBlock`] error.
389 ///
390 /// # Notes
391 ///
392 /// This method is required to be called for **all** I/O operations to
393 /// ensure the user will receive events once the socket is ready again after
394 /// returning a [`WouldBlock`] error.
395 ///
396 /// [`WouldBlock`]: io::ErrorKind::WouldBlock
397 ///
398 /// # Examples
399 ///
400 /// ```
401 /// # use std::error::Error;
402 /// #
403 /// # fn main() -> Result<(), Box<dyn Error>> {
404 /// use std::io;
405 /// use std::os::unix::io::AsRawFd;
406 /// use mio::unix::pipe;
407 ///
408 /// let (sender, receiver) = pipe::new()?;
409 ///
410 /// // Wait until the sender is writable...
411 ///
412 /// // Write to the sender using a direct libc call, of course the
413 /// // `io::Write` implementation would be easier to use.
414 /// let buf = b"hello";
415 /// let n = sender.try_io(|| {
416 /// let buf_ptr = &buf as *const _ as *const _;
417 /// let res = unsafe { libc::write(sender.as_raw_fd(), buf_ptr, buf.len()) };
418 /// if res != -1 {
419 /// Ok(res as usize)
420 /// } else {
421 /// // If EAGAIN or EWOULDBLOCK is set by libc::write, the closure
422 /// // should return `WouldBlock` error.
423 /// Err(io::Error::last_os_error())
424 /// }
425 /// })?;
426 /// eprintln!("write {} bytes", n);
427 ///
428 /// // Wait until the receiver is readable...
429 ///
430 /// // Read from the receiver using a direct libc call, of course the
431 /// // `io::Read` implementation would be easier to use.
432 /// let mut buf = [0; 512];
433 /// let n = receiver.try_io(|| {
434 /// let buf_ptr = &mut buf as *mut _ as *mut _;
435 /// let res = unsafe { libc::read(receiver.as_raw_fd(), buf_ptr, buf.len()) };
436 /// if res != -1 {
437 /// Ok(res as usize)
438 /// } else {
439 /// // If EAGAIN or EWOULDBLOCK is set by libc::read, the closure
440 /// // should return `WouldBlock` error.
441 /// Err(io::Error::last_os_error())
442 /// }
443 /// })?;
444 /// eprintln!("read {} bytes", n);
445 /// # Ok(())
446 /// # }
447 /// ```
try_io<F, T>(&self, f: F) -> io::Result<T> where F: FnOnce() -> io::Result<T>,448 pub fn try_io<F, T>(&self, f: F) -> io::Result<T>
449 where
450 F: FnOnce() -> io::Result<T>,
451 {
452 self.inner.do_io(|_| f())
453 }
454 }
455
456 impl event::Source for Receiver {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>457 fn register(
458 &mut self,
459 registry: &Registry,
460 token: Token,
461 interests: Interest,
462 ) -> io::Result<()> {
463 self.inner.register(registry, token, interests)
464 }
465
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>466 fn reregister(
467 &mut self,
468 registry: &Registry,
469 token: Token,
470 interests: Interest,
471 ) -> io::Result<()> {
472 self.inner.reregister(registry, token, interests)
473 }
474
deregister(&mut self, registry: &Registry) -> io::Result<()>475 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
476 self.inner.deregister(registry)
477 }
478 }
479
480 impl Read for Receiver {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>481 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
482 self.inner.do_io(|mut sender| sender.read(buf))
483 }
484
read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>485 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
486 self.inner.do_io(|mut sender| sender.read_vectored(bufs))
487 }
488 }
489
490 impl Read for &Receiver {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>491 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
492 self.inner.do_io(|mut sender| sender.read(buf))
493 }
494
read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize>495 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
496 self.inner.do_io(|mut sender| sender.read_vectored(bufs))
497 }
498 }
499
500 /// # Notes
501 ///
502 /// The underlying pipe is **not** set to non-blocking.
503 impl From<ChildStdout> for Receiver {
from(stdout: ChildStdout) -> Receiver504 fn from(stdout: ChildStdout) -> Receiver {
505 // Safety: `ChildStdout` is guaranteed to be a valid file descriptor.
506 unsafe { Receiver::from_raw_fd(stdout.into_raw_fd()) }
507 }
508 }
509
510 /// # Notes
511 ///
512 /// The underlying pipe is **not** set to non-blocking.
513 impl From<ChildStderr> for Receiver {
from(stderr: ChildStderr) -> Receiver514 fn from(stderr: ChildStderr) -> Receiver {
515 // Safety: `ChildStderr` is guaranteed to be a valid file descriptor.
516 unsafe { Receiver::from_raw_fd(stderr.into_raw_fd()) }
517 }
518 }
519
520 impl FromRawFd for Receiver {
from_raw_fd(fd: RawFd) -> Receiver521 unsafe fn from_raw_fd(fd: RawFd) -> Receiver {
522 Receiver {
523 inner: IoSource::new(File::from_raw_fd(fd)),
524 }
525 }
526 }
527
528 impl AsRawFd for Receiver {
as_raw_fd(&self) -> RawFd529 fn as_raw_fd(&self) -> RawFd {
530 self.inner.as_raw_fd()
531 }
532 }
533
534 impl IntoRawFd for Receiver {
into_raw_fd(self) -> RawFd535 fn into_raw_fd(self) -> RawFd {
536 self.inner.into_inner().into_raw_fd()
537 }
538 }
539
540 #[cfg(not(target_os = "illumos"))]
set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()>541 fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
542 let value = nonblocking as libc::c_int;
543 if unsafe { libc::ioctl(fd, libc::FIONBIO, &value) } == -1 {
544 Err(io::Error::last_os_error())
545 } else {
546 Ok(())
547 }
548 }
549
550 #[cfg(target_os = "illumos")]
set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()>551 fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {
552 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
553 if flags < 0 {
554 return Err(io::Error::last_os_error());
555 }
556
557 let nflags = if nonblocking {
558 flags | libc::O_NONBLOCK
559 } else {
560 flags & !libc::O_NONBLOCK
561 };
562
563 if flags != nflags {
564 if unsafe { libc::fcntl(fd, libc::F_SETFL, nflags) } < 0 {
565 return Err(io::Error::last_os_error());
566 }
567 }
568
569 Ok(())
570 }
571