• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! A cross-platform library for opening OS pipes, like those from
2 //! [`pipe`](https://man7.org/linux/man-pages/man2/pipe.2.html) on Linux
3 //! or
4 //! [`CreatePipe`](https://docs.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe)
5 //! on Windows. The Rust standard library provides
6 //! [`Stdio::piped`](https://doc.rust-lang.org/std/process/struct.Stdio.html#method.piped)
7 //! for simple use cases involving child processes, but it doesn't
8 //! support creating pipes directly. This crate fills that gap.
9 //!
10 //! - [Docs](https://docs.rs/os_pipe)
11 //! - [Crate](https://crates.io/crates/os_pipe)
12 //! - [Repo](https://github.com/oconnor663/os_pipe.rs)
13 //!
14 //! # Common deadlocks related to pipes
15 //!
16 //! When you work with pipes, you often end up debugging a deadlock at
17 //! some point. These can be confusing if you don't know why they
18 //! happen. Here are two things you need to know:
19 //!
20 //! 1. Pipe reads will block waiting for input as long as there's at
21 //!    least one writer still open. **If you forget to close a writer,
22 //!    reads will block forever.** This includes writers that you give
23 //!    to child processes.
24 //! 2. Pipes have an internal buffer of some fixed size. On Linux for
25 //!    example, pipe buffers are 64 KiB by default. When the buffer is
26 //!    full, writes will block waiting for space. **If the buffer is
27 //!    full and there aren't any readers, writes will block forever.**
28 //!
29 //! Deadlocks caused by a forgotten writer usually show up immediately,
30 //! which makes them relatively easy to fix once you know what to look
31 //! for. (See "Avoid a deadlock!" in the example code below.) However,
32 //! deadlocks caused by full pipe buffers are trickier. These might only
33 //! show up for larger inputs, and they might be timing-dependent or
34 //! platform-dependent. If you find that writing to a pipe deadlocks
35 //! sometimes, think about who's supposed to be reading from that pipe,
36 //! and whether that thread or process might be blocked on something
37 //! else. For more on this, see the [Gotchas
38 //! Doc](https://github.com/oconnor663/duct.py/blob/master/gotchas.md#using-io-threads-to-avoid-blocking-children)
39 //! from the [`duct`](https://github.com/oconnor663/duct.rs) crate. (And
40 //! consider whether [`duct`](https://github.com/oconnor663/duct.rs)
41 //! might be a good fit for your use case.)
42 //!
43 //! # Examples
44 //!
45 //! Here we write a single byte into a pipe and read it back out:
46 //!
47 //! ```rust
48 //! # fn main() -> Result<(), Box<dyn std::error::Error>> {
49 //! use std::io::prelude::*;
50 //!
51 //! let (mut reader, mut writer) = os_pipe::pipe()?;
52 //! // XXX: If this write blocks, we'll never get to the read.
53 //! writer.write_all(b"x")?;
54 //! let mut output = [0];
55 //! reader.read_exact(&mut output)?;
56 //! assert_eq!(b"x", &output);
57 //! # Ok(())
58 //! # }
59 //! ```
60 //!
61 //! This is a minimal working example, but as discussed in the section
62 //! above, reading and writing on the same thread like this is
63 //! deadlock-prone. If we wrote 100 KB instead of just one byte, this
64 //! example would block on `write_all`, it would never make it to
65 //! `read_exact`, and that would be a deadlock. Doing the read and write
66 //! from different threads or different processes would fix the
67 //! deadlock.
68 //!
69 //! For a more complex example, here we join the stdout and stderr of a
70 //! child process into a single pipe. To do that we open a pipe, clone
71 //! its writer, and set that pair of writers as the child's stdout and
72 //! stderr. (This is possible because `PipeWriter` implements
73 //! `Into<Stdio>`.) Then we can read interleaved output from the pipe
74 //! reader. This example is deadlock-free, but note the comment about
75 //! closing the writers.
76 //!
77 //! ```rust
78 //! # use std::io::prelude::*;
79 //! # fn main() -> Result<(), Box<dyn std::error::Error>> {
80 //! // We're going to spawn a child process that prints "foo" to stdout
81 //! // and "bar" to stderr, and we'll combine these into a single pipe.
82 //! let mut command = std::process::Command::new("python");
83 //! command.args(&["-c", r#"
84 //! import sys
85 //! sys.stdout.write("foo")
86 //! sys.stdout.flush()
87 //! sys.stderr.write("bar")
88 //! sys.stderr.flush()
89 //! "#]);
90 //!
91 //! // Here's the interesting part. Open a pipe, clone its writer, and
92 //! // set that pair of writers as the child's stdout and stderr.
93 //! let (mut reader, writer) = os_pipe::pipe()?;
94 //! let writer_clone = writer.try_clone()?;
95 //! command.stdout(writer);
96 //! command.stderr(writer_clone);
97 //!
98 //! // Now start the child process running.
99 //! let mut handle = command.spawn()?;
100 //!
101 //! // Avoid a deadlock! This parent process is still holding open pipe
102 //! // writers inside the Command object, and we have to close those
103 //! // before we read. Here we do this by dropping the Command object.
104 //! drop(command);
105 //!
106 //! // Finally we can read all the output and clean up the child.
107 //! let mut output = String::new();
108 //! reader.read_to_string(&mut output)?;
109 //! handle.wait()?;
110 //! assert_eq!(output, "foobar");
111 //! # Ok(())
112 //! # }
113 //! ```
114 //!
115 //! Note that the [`duct`](https://github.com/oconnor663/duct.rs) crate
116 //! can reproduce the example above in a single line of code, with no
117 //! risk of deadlocks and no risk of leaking [zombie
118 //! children](https://en.wikipedia.org/wiki/Zombie_process).
119 
120 use std::fs::File;
121 use std::io;
122 use std::process::Stdio;
123 
124 #[cfg(not(windows))]
125 #[path = "unix.rs"]
126 mod sys;
127 #[cfg(windows)]
128 #[path = "windows.rs"]
129 mod sys;
130 
131 /// The reading end of a pipe, returned by [`pipe`](fn.pipe.html).
132 ///
133 /// `PipeReader` implements `Into<Stdio>`, so you can pass it as an argument to
134 /// `Command::stdin` to spawn a child process that reads from the pipe.
135 #[derive(Debug)]
136 pub struct PipeReader(
137     // We use std::fs::File here for two reasons: OwnedFd and OwnedHandle are platform-specific,
138     // and this gives us read/write/flush for free.
139     File,
140 );
141 
142 impl PipeReader {
try_clone(&self) -> io::Result<PipeReader>143     pub fn try_clone(&self) -> io::Result<PipeReader> {
144         self.0.try_clone().map(PipeReader)
145     }
146 }
147 
148 impl io::Read for PipeReader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>149     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
150         self.0.read(buf)
151     }
152 }
153 
154 impl<'a> io::Read for &'a PipeReader {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>155     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
156         (&self.0).read(buf)
157     }
158 }
159 
160 impl From<PipeReader> for Stdio {
from(p: PipeReader) -> Stdio161     fn from(p: PipeReader) -> Stdio {
162         p.0.into()
163     }
164 }
165 
166 /// The writing end of a pipe, returned by [`pipe`](fn.pipe.html).
167 ///
168 /// `PipeWriter` implements `Into<Stdio>`, so you can pass it as an argument to
169 /// `Command::stdout` or `Command::stderr` to spawn a child process that writes
170 /// to the pipe.
171 #[derive(Debug)]
172 pub struct PipeWriter(File);
173 
174 impl PipeWriter {
try_clone(&self) -> io::Result<PipeWriter>175     pub fn try_clone(&self) -> io::Result<PipeWriter> {
176         self.0.try_clone().map(PipeWriter)
177     }
178 }
179 
180 impl io::Write for PipeWriter {
write(&mut self, buf: &[u8]) -> io::Result<usize>181     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
182         self.0.write(buf)
183     }
184 
flush(&mut self) -> io::Result<()>185     fn flush(&mut self) -> io::Result<()> {
186         self.0.flush()
187     }
188 }
189 
190 impl<'a> io::Write for &'a PipeWriter {
write(&mut self, buf: &[u8]) -> io::Result<usize>191     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
192         (&self.0).write(buf)
193     }
194 
flush(&mut self) -> io::Result<()>195     fn flush(&mut self) -> io::Result<()> {
196         (&self.0).flush()
197     }
198 }
199 
200 impl From<PipeWriter> for Stdio {
from(p: PipeWriter) -> Stdio201     fn from(p: PipeWriter) -> Stdio {
202         p.0.into()
203     }
204 }
205 
206 /// Open a new pipe and return a [`PipeReader`] and [`PipeWriter`] pair.
207 ///
208 /// This corresponds to the `pipe2` library call on Posix and the
209 /// `CreatePipe` library call on Windows (though these implementation
210 /// details might change). These pipes are non-inheritable, so new child
211 /// processes won't receive a copy of them unless they're explicitly
212 /// passed as stdin/stdout/stderr.
213 ///
214 /// [`PipeReader`]: struct.PipeReader.html
215 /// [`PipeWriter`]: struct.PipeWriter.html
pipe() -> io::Result<(PipeReader, PipeWriter)>216 pub fn pipe() -> io::Result<(PipeReader, PipeWriter)> {
217     sys::pipe()
218 }
219 
220 /// Get a duplicated copy of the current process's standard input, as a
221 /// [`PipeReader`].
222 ///
223 /// Reading directly from this pipe isn't recommended, because it's not
224 /// synchronized with [`std::io::stdin`]. [`PipeReader`] implements
225 /// [`Into<Stdio>`], so it can be passed directly to [`Command::stdin`]. This is
226 /// equivalent to [`Stdio::inherit`], though, so it's usually not necessary
227 /// unless you need a collection of different pipes.
228 ///
229 /// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html
230 /// [`PipeReader`]: struct.PipeReader.html
231 /// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
232 /// [`Command::stdin`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdin
233 /// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
dup_stdin() -> io::Result<PipeReader>234 pub fn dup_stdin() -> io::Result<PipeReader> {
235     sys::dup(io::stdin()).map(PipeReader::from)
236 }
237 
238 /// Get a duplicated copy of the current process's standard output, as a
239 /// [`PipeWriter`](struct.PipeWriter.html).
240 ///
241 /// Writing directly to this pipe isn't recommended, because it's not
242 /// synchronized with [`std::io::stdout`]. [`PipeWriter`] implements
243 /// [`Into<Stdio>`], so it can be passed directly to [`Command::stdout`] or
244 /// [`Command::stderr`]. This can be useful if you want the child's stderr to go
245 /// to the parent's stdout.
246 ///
247 /// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html
248 /// [`PipeWriter`]: struct.PipeWriter.html
249 /// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
250 /// [`Command::stdout`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdout
251 /// [`Command::stderr`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stderr
252 /// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
dup_stdout() -> io::Result<PipeWriter>253 pub fn dup_stdout() -> io::Result<PipeWriter> {
254     sys::dup(io::stdout()).map(PipeWriter::from)
255 }
256 
257 /// Get a duplicated copy of the current process's standard error, as a
258 /// [`PipeWriter`](struct.PipeWriter.html).
259 ///
260 /// Writing directly to this pipe isn't recommended, because it's not
261 /// synchronized with [`std::io::stderr`]. [`PipeWriter`] implements
262 /// [`Into<Stdio>`], so it can be passed directly to [`Command::stdout`] or
263 /// [`Command::stderr`]. This can be useful if you want the child's stdout to go
264 /// to the parent's stderr.
265 ///
266 /// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html
267 /// [`PipeWriter`]: struct.PipeWriter.html
268 /// [`Into<Stdio>`]: https://doc.rust-lang.org/std/process/struct.Stdio.html
269 /// [`Command::stdout`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stdout
270 /// [`Command::stderr`]: https://doc.rust-lang.org/std/process/struct.Command.html#method.stderr
271 /// [`Stdio::inherit`]: https://doc.rust-lang.org/std/process/struct.Stdio.html#method.inherit
dup_stderr() -> io::Result<PipeWriter>272 pub fn dup_stderr() -> io::Result<PipeWriter> {
273     sys::dup(io::stderr()).map(PipeWriter::from)
274 }
275 
276 #[cfg(test)]
277 mod tests {
278     use std::env::consts::EXE_EXTENSION;
279     use std::io::prelude::*;
280     use std::path::{Path, PathBuf};
281     use std::process::Command;
282     use std::sync::Once;
283     use std::thread;
284 
path_to_exe(name: &str) -> PathBuf285     fn path_to_exe(name: &str) -> PathBuf {
286         // This project defines some associated binaries for testing, and we shell out to them in
287         // these tests. `cargo test` doesn't automatically build associated binaries, so this
288         // function takes care of building them explicitly, with the right debug/release flavor.
289         static CARGO_BUILD_ONCE: Once = Once::new();
290         CARGO_BUILD_ONCE.call_once(|| {
291             let mut build_command = Command::new("cargo");
292             build_command.args(&["build", "--quiet"]);
293             if !cfg!(debug_assertions) {
294                 build_command.arg("--release");
295             }
296             let build_status = build_command.status().unwrap();
297             assert!(
298                 build_status.success(),
299                 "Cargo failed to build associated binaries."
300             );
301         });
302         let flavor = if cfg!(debug_assertions) {
303             "debug"
304         } else {
305             "release"
306         };
307         Path::new("target")
308             .join(flavor)
309             .join(name)
310             .with_extension(EXE_EXTENSION)
311     }
312 
313     #[test]
test_pipe_some_data()314     fn test_pipe_some_data() {
315         let (mut reader, mut writer) = crate::pipe().unwrap();
316         // A small write won't fill the pipe buffer, so it won't block this thread.
317         writer.write_all(b"some stuff").unwrap();
318         drop(writer);
319         let mut out = String::new();
320         reader.read_to_string(&mut out).unwrap();
321         assert_eq!(out, "some stuff");
322     }
323 
324     #[test]
test_pipe_some_data_with_refs()325     fn test_pipe_some_data_with_refs() {
326         // As with `File`, there's a second set of impls for shared
327         // refs. Test those.
328         let (reader, writer) = crate::pipe().unwrap();
329         let mut reader_ref = &reader;
330         {
331             let mut writer_ref = &writer;
332             // A small write won't fill the pipe buffer, so it won't block this thread.
333             writer_ref.write_all(b"some stuff").unwrap();
334         }
335         drop(writer);
336         let mut out = String::new();
337         reader_ref.read_to_string(&mut out).unwrap();
338         assert_eq!(out, "some stuff");
339     }
340 
341     #[test]
test_pipe_no_data()342     fn test_pipe_no_data() {
343         let (mut reader, writer) = crate::pipe().unwrap();
344         drop(writer);
345         let mut out = String::new();
346         reader.read_to_string(&mut out).unwrap();
347         assert_eq!(out, "");
348     }
349 
350     #[test]
test_pipe_a_megabyte_of_data_from_another_thread()351     fn test_pipe_a_megabyte_of_data_from_another_thread() {
352         let data = vec![0xff; 1_000_000];
353         let data_copy = data.clone();
354         let (mut reader, mut writer) = crate::pipe().unwrap();
355         let joiner = thread::spawn(move || {
356             writer.write_all(&data_copy).unwrap();
357             // This drop happens automatically, so writing it out here is mostly
358             // just for clarity. For what it's worth, it also guards against
359             // accidentally forgetting to drop if we switch to scoped threads or
360             // something like that and change this to a non-moving closure. The
361             // explicit drop forces `writer` to move.
362             drop(writer);
363         });
364         let mut out = Vec::new();
365         reader.read_to_end(&mut out).unwrap();
366         joiner.join().unwrap();
367         assert_eq!(out, data);
368     }
369 
370     #[test]
test_pipes_are_not_inheritable()371     fn test_pipes_are_not_inheritable() {
372         // Create pipes for a child process.
373         let (input_reader, mut input_writer) = crate::pipe().unwrap();
374         let (mut output_reader, output_writer) = crate::pipe().unwrap();
375 
376         // Create a bunch of duplicated copies, which we'll close later. This
377         // tests that duplication preserves non-inheritability.
378         let ir_dup = input_reader.try_clone().unwrap();
379         let iw_dup = input_writer.try_clone().unwrap();
380         let or_dup = output_reader.try_clone().unwrap();
381         let ow_dup = output_writer.try_clone().unwrap();
382 
383         // Spawn the child. Note that this temporary Command object takes
384         // ownership of our copies of the child's stdin and stdout, and then
385         // closes them immediately when it drops. That stops us from blocking
386         // our own read below. We use our own simple implementation of cat for
387         // compatibility with Windows.
388         let mut child = Command::new(path_to_exe("cat"))
389             .stdin(input_reader)
390             .stdout(output_writer)
391             .spawn()
392             .unwrap();
393 
394         // Drop all the dups now that the child is spawned.
395         drop(ir_dup);
396         drop(iw_dup);
397         drop(or_dup);
398         drop(ow_dup);
399 
400         // Write to the child's stdin. This is a small write, so it shouldn't
401         // block.
402         input_writer.write_all(b"hello").unwrap();
403         drop(input_writer);
404 
405         // Read from the child's stdout. If this child has accidentally
406         // inherited the write end of its own stdin, then it will never exit,
407         // and this read will block forever. That's what this test is all
408         // about.
409         let mut output = Vec::new();
410         output_reader.read_to_end(&mut output).unwrap();
411         child.wait().unwrap();
412 
413         // Confirm that we got the right bytes.
414         assert_eq!(b"hello", &*output);
415     }
416 
417     #[test]
test_parent_handles()418     fn test_parent_handles() {
419         // This test invokes the `swap` test program, which uses parent_stdout() and
420         // parent_stderr() to swap the outputs for another child that it spawns.
421 
422         // Create pipes for a child process.
423         let (reader, mut writer) = crate::pipe().unwrap();
424 
425         // Write input. This shouldn't block because it's small. Then close the write end, or else
426         // the child will hang.
427         writer.write_all(b"quack").unwrap();
428         drop(writer);
429 
430         // Use `swap` to run `cat_both`. `cat_both will read "quack" from stdin
431         // and write it to stdout and stderr with different tags. But because we
432         // run it inside `swap`, the tags in the output should be backwards.
433         let output = Command::new(path_to_exe("swap"))
434             .arg(path_to_exe("cat_both"))
435             .stdin(reader)
436             .output()
437             .unwrap();
438 
439         // Check for a clean exit.
440         assert!(
441             output.status.success(),
442             "child process returned {:#?}",
443             output
444         );
445 
446         // Confirm that we got the right bytes.
447         assert_eq!(b"stderr: quack", &*output.stdout);
448         assert_eq!(b"stdout: quack", &*output.stderr);
449     }
450 
451     #[test]
test_parent_handles_dont_close()452     fn test_parent_handles_dont_close() {
453         // Open and close each parent pipe multiple times. If this closes the
454         // original, subsequent opens should fail.
455         let stdin = crate::dup_stdin().unwrap();
456         drop(stdin);
457         let stdin = crate::dup_stdin().unwrap();
458         drop(stdin);
459 
460         let stdout = crate::dup_stdout().unwrap();
461         drop(stdout);
462         let stdout = crate::dup_stdout().unwrap();
463         drop(stdout);
464 
465         let stderr = crate::dup_stderr().unwrap();
466         drop(stderr);
467         let stderr = crate::dup_stderr().unwrap();
468         drop(stderr);
469     }
470 
471     #[test]
test_try_clone()472     fn test_try_clone() {
473         let (reader, writer) = crate::pipe().unwrap();
474         let mut reader_clone = reader.try_clone().unwrap();
475         let mut writer_clone = writer.try_clone().unwrap();
476         // A small write won't fill the pipe buffer, so it won't block this thread.
477         writer_clone.write_all(b"some stuff").unwrap();
478         drop(writer);
479         drop(writer_clone);
480         let mut out = String::new();
481         reader_clone.read_to_string(&mut out).unwrap();
482         assert_eq!(out, "some stuff");
483     }
484 
485     #[test]
test_debug()486     fn test_debug() {
487         let (reader, writer) = crate::pipe().unwrap();
488         _ = format!("{:?} {:?}", reader, writer);
489     }
490 }
491