• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Unix pipe types.
2 
3 use crate::io::interest::Interest;
4 use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
5 
6 use mio::unix::pipe as mio_pipe;
7 use std::fs::File;
8 use std::io::{self, Read, Write};
9 use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
10 use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
11 use std::path::Path;
12 use std::pin::Pin;
13 use std::task::{Context, Poll};
14 
15 cfg_io_util! {
16     use bytes::BufMut;
17 }
18 
19 /// Options and flags which can be used to configure how a FIFO file is opened.
20 ///
21 /// This builder allows configuring how to create a pipe end from a FIFO file.
22 /// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
23 /// then chain calls to methods to set each option, then call either
24 /// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
25 /// are trying to open. This will give you a [`io::Result`][result] with a pipe
26 /// end inside that you can further operate on.
27 ///
28 /// [`new`]: OpenOptions::new
29 /// [`open_receiver`]: OpenOptions::open_receiver
30 /// [`open_sender`]: OpenOptions::open_sender
31 /// [result]: std::io::Result
32 ///
33 /// # Examples
34 ///
35 /// Opening a pair of pipe ends from a FIFO file:
36 ///
37 /// ```no_run
38 /// use tokio::net::unix::pipe;
39 /// # use std::error::Error;
40 ///
41 /// const FIFO_NAME: &str = "path/to/a/fifo";
42 ///
43 /// # async fn dox() -> Result<(), Box<dyn Error>> {
44 /// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
45 /// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?;
46 /// # Ok(())
47 /// # }
48 /// ```
49 ///
50 /// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
51 ///
52 /// ```ignore
53 /// use tokio::net::unix::pipe;
54 /// use nix::{unistd::mkfifo, sys::stat::Mode};
55 /// # use std::error::Error;
56 ///
57 /// // Our program has exclusive access to this path.
58 /// const FIFO_NAME: &str = "path/to/a/new/fifo";
59 ///
60 /// # async fn dox() -> Result<(), Box<dyn Error>> {
61 /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
62 /// let tx = pipe::OpenOptions::new()
63 ///     .read_write(true)
64 ///     .unchecked(true)
65 ///     .open_sender(FIFO_NAME)?;
66 /// # Ok(())
67 /// # }
68 /// ```
69 #[derive(Clone, Debug)]
70 pub struct OpenOptions {
71     #[cfg(target_os = "linux")]
72     read_write: bool,
73     unchecked: bool,
74 }
75 
76 impl OpenOptions {
77     /// Creates a blank new set of options ready for configuration.
78     ///
79     /// All options are initially set to `false`.
new() -> OpenOptions80     pub fn new() -> OpenOptions {
81         OpenOptions {
82             #[cfg(target_os = "linux")]
83             read_write: false,
84             unchecked: false,
85         }
86     }
87 
88     /// Sets the option for read-write access.
89     ///
90     /// This option, when true, will indicate that a FIFO file will be opened
91     /// in read-write access mode. This operation is not defined by the POSIX
92     /// standard and is only guaranteed to work on Linux.
93     ///
94     /// # Examples
95     ///
96     /// Opening a [`Sender`] even if there are no open reading ends:
97     ///
98     /// ```ignore
99     /// use tokio::net::unix::pipe;
100     ///
101     /// let tx = pipe::OpenOptions::new()
102     ///     .read_write(true)
103     ///     .open_sender("path/to/a/fifo");
104     /// ```
105     ///
106     /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
107     /// fail with [`UnexpectedEof`] during reading if all writing ends of the
108     /// pipe close the FIFO file.
109     ///
110     /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
111     ///
112     /// ```ignore
113     /// use tokio::net::unix::pipe;
114     ///
115     /// let tx = pipe::OpenOptions::new()
116     ///     .read_write(true)
117     ///     .open_receiver("path/to/a/fifo");
118     /// ```
119     #[cfg(target_os = "linux")]
120     #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
read_write(&mut self, value: bool) -> &mut Self121     pub fn read_write(&mut self, value: bool) -> &mut Self {
122         self.read_write = value;
123         self
124     }
125 
126     /// Sets the option to skip the check for FIFO file type.
127     ///
128     /// By default, [`open_receiver`] and [`open_sender`] functions will check
129     /// if the opened file is a FIFO file. Set this option to `true` if you are
130     /// sure the file is a FIFO file.
131     ///
132     /// [`open_receiver`]: OpenOptions::open_receiver
133     /// [`open_sender`]: OpenOptions::open_sender
134     ///
135     /// # Examples
136     ///
137     /// ```no_run
138     /// use tokio::net::unix::pipe;
139     /// use nix::{unistd::mkfifo, sys::stat::Mode};
140     /// # use std::error::Error;
141     ///
142     /// // Our program has exclusive access to this path.
143     /// const FIFO_NAME: &str = "path/to/a/new/fifo";
144     ///
145     /// # async fn dox() -> Result<(), Box<dyn Error>> {
146     /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
147     /// let rx = pipe::OpenOptions::new()
148     ///     .unchecked(true)
149     ///     .open_receiver(FIFO_NAME)?;
150     /// # Ok(())
151     /// # }
152     /// ```
unchecked(&mut self, value: bool) -> &mut Self153     pub fn unchecked(&mut self, value: bool) -> &mut Self {
154         self.unchecked = value;
155         self
156     }
157 
158     /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`.
159     ///
160     /// This function will open the FIFO file at the specified path, possibly
161     /// check if it is a pipe, and associate the pipe with the default event
162     /// loop for reading.
163     ///
164     /// # Errors
165     ///
166     /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
167     /// This function may also fail with other standard OS errors.
168     ///
169     /// # Panics
170     ///
171     /// This function panics if it is not called from within a runtime with
172     /// IO enabled.
173     ///
174     /// The runtime is usually set implicitly when this function is called
175     /// from a future driven by a tokio runtime, otherwise runtime can be set
176     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver>177     pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
178         let file = self.open(path.as_ref(), PipeEnd::Receiver)?;
179         Receiver::from_file_unchecked(file)
180     }
181 
182     /// Creates a [`Sender`] from a FIFO file with the options specified by `self`.
183     ///
184     /// This function will open the FIFO file at the specified path, possibly
185     /// check if it is a pipe, and associate the pipe with the default event
186     /// loop for writing.
187     ///
188     /// # Errors
189     ///
190     /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
191     /// If the file is not opened in read-write access mode and the file is not
192     /// currently open for reading, this function will fail with `ENXIO`.
193     /// This function may also fail with other standard OS errors.
194     ///
195     /// # Panics
196     ///
197     /// This function panics if it is not called from within a runtime with
198     /// IO enabled.
199     ///
200     /// The runtime is usually set implicitly when this function is called
201     /// from a future driven by a tokio runtime, otherwise runtime can be set
202     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender>203     pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
204         let file = self.open(path.as_ref(), PipeEnd::Sender)?;
205         Sender::from_file_unchecked(file)
206     }
207 
open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File>208     fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
209         let mut options = std::fs::OpenOptions::new();
210         options
211             .read(pipe_end == PipeEnd::Receiver)
212             .write(pipe_end == PipeEnd::Sender)
213             .custom_flags(libc::O_NONBLOCK);
214 
215         #[cfg(target_os = "linux")]
216         if self.read_write {
217             options.read(true).write(true);
218         }
219 
220         let file = options.open(path)?;
221 
222         if !self.unchecked && !is_fifo(&file)? {
223             return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
224         }
225 
226         Ok(file)
227     }
228 }
229 
230 impl Default for OpenOptions {
default() -> OpenOptions231     fn default() -> OpenOptions {
232         OpenOptions::new()
233     }
234 }
235 
236 #[derive(Clone, Copy, PartialEq, Eq, Debug)]
237 enum PipeEnd {
238     Sender,
239     Receiver,
240 }
241 
242 /// Writing end of a Unix pipe.
243 ///
244 /// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
245 ///
246 /// Opening a named pipe for writing involves a few steps.
247 /// Call to [`OpenOptions::open_sender`] might fail with an error indicating
248 /// different things:
249 ///
250 /// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
251 /// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
252 /// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
253 ///   Sleep for a while and try again.
254 /// * Other OS errors not specific to opening FIFO files.
255 ///
256 /// Opening a `Sender` from a FIFO file should look like this:
257 ///
258 /// ```no_run
259 /// use tokio::net::unix::pipe;
260 /// use tokio::time::{self, Duration};
261 ///
262 /// const FIFO_NAME: &str = "path/to/a/fifo";
263 ///
264 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
265 /// // Wait for a reader to open the file.
266 /// let tx = loop {
267 ///     match pipe::OpenOptions::new().open_sender(FIFO_NAME) {
268 ///         Ok(tx) => break tx,
269 ///         Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {},
270 ///         Err(e) => return Err(e.into()),
271 ///     }
272 ///
273 ///     time::sleep(Duration::from_millis(50)).await;
274 /// };
275 /// # Ok(())
276 /// # }
277 /// ```
278 ///
279 /// On Linux, it is possible to create a `Sender` without waiting in a sleeping
280 /// loop. This is done by opening a named pipe in read-write access mode with
281 /// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
282 /// both a writing end and a reading end, and the latter allows to open a FIFO
283 /// without [`ENXIO`] error since the pipe is open for reading as well.
284 ///
285 /// `Sender` cannot be used to read from a pipe, so in practice the read access
286 /// is only used when a FIFO is opened. However, using a `Sender` in read-write
287 /// mode **may lead to lost data**, because written data will be dropped by the
288 /// system as soon as all pipe ends are closed. To avoid lost data you have to
289 /// make sure that a reading end has been opened before dropping a `Sender`.
290 ///
291 /// Note that using read-write access mode with FIFO files is not defined by
292 /// the POSIX standard and it is only guaranteed to work on Linux.
293 ///
294 /// ```ignore
295 /// use tokio::io::AsyncWriteExt;
296 /// use tokio::net::unix::pipe;
297 ///
298 /// const FIFO_NAME: &str = "path/to/a/fifo";
299 ///
300 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
301 /// let mut tx = pipe::OpenOptions::new()
302 ///     .read_write(true)
303 ///     .open_sender(FIFO_NAME)?;
304 ///
305 /// // Asynchronously write to the pipe before a reader.
306 /// tx.write_all(b"hello world").await?;
307 /// # Ok(())
308 /// # }
309 /// ```
310 ///
311 /// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
312 #[derive(Debug)]
313 pub struct Sender {
314     io: PollEvented<mio_pipe::Sender>,
315 }
316 
317 impl Sender {
from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender>318     fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> {
319         let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?;
320         Ok(Sender { io })
321     }
322 
323     /// Creates a new `Sender` from a [`File`].
324     ///
325     /// This function is intended to construct a pipe from a [`File`] representing
326     /// a special FIFO file. It will check if the file is a pipe and has write access,
327     /// set it in non-blocking mode and perform the conversion.
328     ///
329     /// # Errors
330     ///
331     /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
332     /// does not have write access. Also fails with any standard OS error if it occurs.
333     ///
334     /// # Panics
335     ///
336     /// This function panics if it is not called from within a runtime with
337     /// IO enabled.
338     ///
339     /// The runtime is usually set implicitly when this function is called
340     /// from a future driven by a tokio runtime, otherwise runtime can be set
341     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file(mut file: File) -> io::Result<Sender>342     pub fn from_file(mut file: File) -> io::Result<Sender> {
343         if !is_fifo(&file)? {
344             return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
345         }
346 
347         let flags = get_file_flags(&file)?;
348         if has_write_access(flags) {
349             set_nonblocking(&mut file, flags)?;
350             Sender::from_file_unchecked(file)
351         } else {
352             Err(io::Error::new(
353                 io::ErrorKind::InvalidInput,
354                 "not in O_WRONLY or O_RDWR access mode",
355             ))
356         }
357     }
358 
359     /// Creates a new `Sender` from a [`File`] without checking pipe properties.
360     ///
361     /// This function is intended to construct a pipe from a File representing
362     /// a special FIFO file. The conversion assumes nothing about the underlying
363     /// file; it is left up to the user to make sure it is opened with write access,
364     /// represents a pipe and is set in non-blocking mode.
365     ///
366     /// # Examples
367     ///
368     /// ```no_run
369     /// use tokio::net::unix::pipe;
370     /// use std::fs::OpenOptions;
371     /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
372     /// # use std::error::Error;
373     ///
374     /// const FIFO_NAME: &str = "path/to/a/fifo";
375     ///
376     /// # async fn dox() -> Result<(), Box<dyn Error>> {
377     /// let file = OpenOptions::new()
378     ///     .write(true)
379     ///     .custom_flags(libc::O_NONBLOCK)
380     ///     .open(FIFO_NAME)?;
381     /// if file.metadata()?.file_type().is_fifo() {
382     ///     let tx = pipe::Sender::from_file_unchecked(file)?;
383     ///     /* use the Sender */
384     /// }
385     /// # Ok(())
386     /// # }
387     /// ```
388     ///
389     /// # Panics
390     ///
391     /// This function panics if it is not called from within a runtime with
392     /// IO enabled.
393     ///
394     /// The runtime is usually set implicitly when this function is called
395     /// from a future driven by a tokio runtime, otherwise runtime can be set
396     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file_unchecked(file: File) -> io::Result<Sender>397     pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
398         let raw_fd = file.into_raw_fd();
399         let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) };
400         Sender::from_mio(mio_tx)
401     }
402 
403     /// Waits for any of the requested ready states.
404     ///
405     /// This function can be used instead of [`writable()`] to check the returned
406     /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events.
407     ///
408     /// The function may complete without the pipe being ready. This is a
409     /// false-positive and attempting an operation will return with
410     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
411     /// [`Ready`] set, so you should always check the returned value and possibly
412     /// wait again if the requested states are not set.
413     ///
414     /// [`writable()`]: Self::writable
415     ///
416     /// # Cancel safety
417     ///
418     /// This method is cancel safe. Once a readiness event occurs, the method
419     /// will continue to return immediately until the readiness event is
420     /// consumed by an attempt to write that fails with `WouldBlock` or
421     /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>422     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
423         let event = self.io.registration().readiness(interest).await?;
424         Ok(event.ready)
425     }
426 
427     /// Waits for the pipe to become writable.
428     ///
429     /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
430     /// paired with [`try_write()`].
431     ///
432     /// [`try_write()`]: Self::try_write
433     ///
434     /// # Examples
435     ///
436     /// ```no_run
437     /// use tokio::net::unix::pipe;
438     /// use std::io;
439     ///
440     /// #[tokio::main]
441     /// async fn main() -> io::Result<()> {
442     ///     // Open a writing end of a fifo
443     ///     let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
444     ///
445     ///     loop {
446     ///         // Wait for the pipe to be writable
447     ///         tx.writable().await?;
448     ///
449     ///         // Try to write data, this may still fail with `WouldBlock`
450     ///         // if the readiness event is a false positive.
451     ///         match tx.try_write(b"hello world") {
452     ///             Ok(n) => {
453     ///                 break;
454     ///             }
455     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
456     ///                 continue;
457     ///             }
458     ///             Err(e) => {
459     ///                 return Err(e.into());
460     ///             }
461     ///         }
462     ///     }
463     ///
464     ///     Ok(())
465     /// }
466     /// ```
writable(&self) -> io::Result<()>467     pub async fn writable(&self) -> io::Result<()> {
468         self.ready(Interest::WRITABLE).await?;
469         Ok(())
470     }
471 
472     /// Polls for write readiness.
473     ///
474     /// If the pipe is not currently ready for writing, this method will
475     /// store a clone of the `Waker` from the provided `Context`. When the pipe
476     /// becomes ready for writing, `Waker::wake` will be called on the waker.
477     ///
478     /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
479     /// the `Waker` from the `Context` passed to the most recent call is
480     /// scheduled to receive a wakeup.
481     ///
482     /// This function is intended for cases where creating and pinning a future
483     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
484     /// preferred, as this supports polling from multiple tasks at once.
485     ///
486     /// [`writable`]: Self::writable
487     ///
488     /// # Return value
489     ///
490     /// The function returns:
491     ///
492     /// * `Poll::Pending` if the pipe is not ready for writing.
493     /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
494     /// * `Poll::Ready(Err(e))` if an error is encountered.
495     ///
496     /// # Errors
497     ///
498     /// This function may encounter any standard I/O error except `WouldBlock`.
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>499     pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
500         self.io.registration().poll_write_ready(cx).map_ok(|_| ())
501     }
502 
503     /// Tries to write a buffer to the pipe, returning how many bytes were
504     /// written.
505     ///
506     /// The function will attempt to write the entire contents of `buf`, but
507     /// only part of the buffer may be written. If the length of `buf` is not
508     /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the
509     /// write is guaranteed to be atomic, i.e. either the entire content of
510     /// `buf` will be written or this method will fail with `WouldBlock`. There
511     /// is no such guarantee if `buf` is larger than `PIPE_BUF`.
512     ///
513     /// This function is usually paired with [`writable`].
514     ///
515     /// [`writable`]: Self::writable
516     ///
517     /// # Return
518     ///
519     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
520     /// number of bytes written. If the pipe is not ready to write data,
521     /// `Err(io::ErrorKind::WouldBlock)` is returned.
522     ///
523     /// # Examples
524     ///
525     /// ```no_run
526     /// use tokio::net::unix::pipe;
527     /// use std::io;
528     ///
529     /// #[tokio::main]
530     /// async fn main() -> io::Result<()> {
531     ///     // Open a writing end of a fifo
532     ///     let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
533     ///
534     ///     loop {
535     ///         // Wait for the pipe to be writable
536     ///         tx.writable().await?;
537     ///
538     ///         // Try to write data, this may still fail with `WouldBlock`
539     ///         // if the readiness event is a false positive.
540     ///         match tx.try_write(b"hello world") {
541     ///             Ok(n) => {
542     ///                 break;
543     ///             }
544     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
545     ///                 continue;
546     ///             }
547     ///             Err(e) => {
548     ///                 return Err(e.into());
549     ///             }
550     ///         }
551     ///     }
552     ///
553     ///     Ok(())
554     /// }
555     /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>556     pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
557         self.io
558             .registration()
559             .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
560     }
561 
562     /// Tries to write several buffers to the pipe, returning how many bytes
563     /// were written.
564     ///
565     /// Data is written from each buffer in order, with the final buffer read
566     /// from possible being only partially consumed. This method behaves
567     /// equivalently to a single call to [`try_write()`] with concatenated
568     /// buffers.
569     ///
570     /// If the total length of buffers is not greater than `PIPE_BUF` (an OS
571     /// constant, 4096 under Linux), then the write is guaranteed to be atomic,
572     /// i.e. either the entire contents of buffers will be written or this
573     /// method will fail with `WouldBlock`. There is no such guarantee if the
574     /// total length of buffers is greater than `PIPE_BUF`.
575     ///
576     /// This function is usually paired with [`writable`].
577     ///
578     /// [`try_write()`]: Self::try_write()
579     /// [`writable`]: Self::writable
580     ///
581     /// # Return
582     ///
583     /// If data is successfully written, `Ok(n)` is returned, where `n` is the
584     /// number of bytes written. If the pipe is not ready to write data,
585     /// `Err(io::ErrorKind::WouldBlock)` is returned.
586     ///
587     /// # Examples
588     ///
589     /// ```no_run
590     /// use tokio::net::unix::pipe;
591     /// use std::io;
592     ///
593     /// #[tokio::main]
594     /// async fn main() -> io::Result<()> {
595     ///     // Open a writing end of a fifo
596     ///     let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
597     ///
598     ///     let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
599     ///
600     ///     loop {
601     ///         // Wait for the pipe to be writable
602     ///         tx.writable().await?;
603     ///
604     ///         // Try to write data, this may still fail with `WouldBlock`
605     ///         // if the readiness event is a false positive.
606     ///         match tx.try_write_vectored(&bufs) {
607     ///             Ok(n) => {
608     ///                 break;
609     ///             }
610     ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
611     ///                 continue;
612     ///             }
613     ///             Err(e) => {
614     ///                 return Err(e.into());
615     ///             }
616     ///         }
617     ///     }
618     ///
619     ///     Ok(())
620     /// }
621     /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>622     pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
623         self.io
624             .registration()
625             .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
626     }
627 }
628 
629 impl AsyncWrite for Sender {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>630     fn poll_write(
631         self: Pin<&mut Self>,
632         cx: &mut Context<'_>,
633         buf: &[u8],
634     ) -> Poll<io::Result<usize>> {
635         self.io.poll_write(cx, buf)
636     }
637 
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>638     fn poll_write_vectored(
639         self: Pin<&mut Self>,
640         cx: &mut Context<'_>,
641         bufs: &[io::IoSlice<'_>],
642     ) -> Poll<io::Result<usize>> {
643         self.io.poll_write_vectored(cx, bufs)
644     }
645 
is_write_vectored(&self) -> bool646     fn is_write_vectored(&self) -> bool {
647         true
648     }
649 
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>650     fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
651         Poll::Ready(Ok(()))
652     }
653 
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>654     fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
655         Poll::Ready(Ok(()))
656     }
657 }
658 
659 impl AsRawFd for Sender {
as_raw_fd(&self) -> RawFd660     fn as_raw_fd(&self) -> RawFd {
661         self.io.as_raw_fd()
662     }
663 }
664 
665 impl AsFd for Sender {
as_fd(&self) -> BorrowedFd<'_>666     fn as_fd(&self) -> BorrowedFd<'_> {
667         unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
668     }
669 }
670 
671 /// Reading end of a Unix pipe.
672 ///
673 /// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
674 ///
675 /// # Examples
676 ///
677 /// Receiving messages from a named pipe in a loop:
678 ///
679 /// ```no_run
680 /// use tokio::net::unix::pipe;
681 /// use tokio::io::{self, AsyncReadExt};
682 ///
683 /// const FIFO_NAME: &str = "path/to/a/fifo";
684 ///
685 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
686 /// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
687 /// loop {
688 ///     let mut msg = vec![0; 256];
689 ///     match rx.read_exact(&mut msg).await {
690 ///         Ok(_) => {
691 ///             /* handle the message */
692 ///         }
693 ///         Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
694 ///             // Writing end has been closed, we should reopen the pipe.
695 ///             rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
696 ///         }
697 ///         Err(e) => return Err(e.into()),
698 ///     }
699 /// }
700 /// # }
701 /// ```
702 ///
703 /// On Linux, you can use a `Receiver` in read-write access mode to implement
704 /// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
705 /// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
706 /// when the writing end is closed. This way, a `Receiver` can asynchronously
707 /// wait for the next writer to open the pipe.
708 ///
709 /// You should not use functions waiting for EOF such as [`read_to_end`] with
710 /// a `Receiver` in read-write access mode, since it **may wait forever**.
711 /// `Receiver` in this mode also holds an open writing end, which prevents
712 /// receiving EOF.
713 ///
714 /// To set the read-write access mode you can use `OpenOptions::read_write`.
715 /// Note that using read-write access mode with FIFO files is not defined by
716 /// the POSIX standard and it is only guaranteed to work on Linux.
717 ///
718 /// ```ignore
719 /// use tokio::net::unix::pipe;
720 /// use tokio::io::AsyncReadExt;
721 /// # use std::error::Error;
722 ///
723 /// const FIFO_NAME: &str = "path/to/a/fifo";
724 ///
725 /// # async fn dox() -> Result<(), Box<dyn Error>> {
726 /// let mut rx = pipe::OpenOptions::new()
727 ///     .read_write(true)
728 ///     .open_receiver(FIFO_NAME)?;
729 /// loop {
730 ///     let mut msg = vec![0; 256];
731 ///     rx.read_exact(&mut msg).await?;
732 ///     /* handle the message */
733 /// }
734 /// # }
735 /// ```
736 ///
737 /// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end
738 #[derive(Debug)]
739 pub struct Receiver {
740     io: PollEvented<mio_pipe::Receiver>,
741 }
742 
743 impl Receiver {
from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver>744     fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> {
745         let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?;
746         Ok(Receiver { io })
747     }
748 
749     /// Creates a new `Receiver` from a [`File`].
750     ///
751     /// This function is intended to construct a pipe from a [`File`] representing
752     /// a special FIFO file. It will check if the file is a pipe and has read access,
753     /// set it in non-blocking mode and perform the conversion.
754     ///
755     /// # Errors
756     ///
757     /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
758     /// does not have read access. Also fails with any standard OS error if it occurs.
759     ///
760     /// # Panics
761     ///
762     /// This function panics if it is not called from within a runtime with
763     /// IO enabled.
764     ///
765     /// The runtime is usually set implicitly when this function is called
766     /// from a future driven by a tokio runtime, otherwise runtime can be set
767     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file(mut file: File) -> io::Result<Receiver>768     pub fn from_file(mut file: File) -> io::Result<Receiver> {
769         if !is_fifo(&file)? {
770             return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
771         }
772 
773         let flags = get_file_flags(&file)?;
774         if has_read_access(flags) {
775             set_nonblocking(&mut file, flags)?;
776             Receiver::from_file_unchecked(file)
777         } else {
778             Err(io::Error::new(
779                 io::ErrorKind::InvalidInput,
780                 "not in O_RDONLY or O_RDWR access mode",
781             ))
782         }
783     }
784 
785     /// Creates a new `Receiver` from a [`File`] without checking pipe properties.
786     ///
787     /// This function is intended to construct a pipe from a File representing
788     /// a special FIFO file. The conversion assumes nothing about the underlying
789     /// file; it is left up to the user to make sure it is opened with read access,
790     /// represents a pipe and is set in non-blocking mode.
791     ///
792     /// # Examples
793     ///
794     /// ```no_run
795     /// use tokio::net::unix::pipe;
796     /// use std::fs::OpenOptions;
797     /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
798     /// # use std::error::Error;
799     ///
800     /// const FIFO_NAME: &str = "path/to/a/fifo";
801     ///
802     /// # async fn dox() -> Result<(), Box<dyn Error>> {
803     /// let file = OpenOptions::new()
804     ///     .read(true)
805     ///     .custom_flags(libc::O_NONBLOCK)
806     ///     .open(FIFO_NAME)?;
807     /// if file.metadata()?.file_type().is_fifo() {
808     ///     let rx = pipe::Receiver::from_file_unchecked(file)?;
809     ///     /* use the Receiver */
810     /// }
811     /// # Ok(())
812     /// # }
813     /// ```
814     ///
815     /// # Panics
816     ///
817     /// This function panics if it is not called from within a runtime with
818     /// IO enabled.
819     ///
820     /// The runtime is usually set implicitly when this function is called
821     /// from a future driven by a tokio runtime, otherwise runtime can be set
822     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file_unchecked(file: File) -> io::Result<Receiver>823     pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
824         let raw_fd = file.into_raw_fd();
825         let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) };
826         Receiver::from_mio(mio_rx)
827     }
828 
829     /// Waits for any of the requested ready states.
830     ///
831     /// This function can be used instead of [`readable()`] to check the returned
832     /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events.
833     ///
834     /// The function may complete without the pipe being ready. This is a
835     /// false-positive and attempting an operation will return with
836     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
837     /// [`Ready`] set, so you should always check the returned value and possibly
838     /// wait again if the requested states are not set.
839     ///
840     /// [`readable()`]: Self::readable
841     ///
842     /// # Cancel safety
843     ///
844     /// This method is cancel safe. Once a readiness event occurs, the method
845     /// will continue to return immediately until the readiness event is
846     /// consumed by an attempt to read that fails with `WouldBlock` or
847     /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>848     pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
849         let event = self.io.registration().readiness(interest).await?;
850         Ok(event.ready)
851     }
852 
853     /// Waits for the pipe to become readable.
854     ///
855     /// This function is equivalent to `ready(Interest::READABLE)` and is usually
856     /// paired with [`try_read()`].
857     ///
858     /// [`try_read()`]: Self::try_read()
859     ///
860     /// # Examples
861     ///
862     /// ```no_run
863     /// use tokio::net::unix::pipe;
864     /// use std::io;
865     ///
866     /// #[tokio::main]
867     /// async fn main() -> io::Result<()> {
868     ///     // Open a reading end of a fifo
869     ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
870     ///
871     ///     let mut msg = vec![0; 1024];
872     ///
873     ///     loop {
874     ///         // Wait for the pipe to be readable
875     ///         rx.readable().await?;
876     ///
877     ///         // Try to read data, this may still fail with `WouldBlock`
878     ///         // if the readiness event is a false positive.
879     ///         match rx.try_read(&mut msg) {
880     ///             Ok(n) => {
881     ///                 msg.truncate(n);
882     ///                 break;
883     ///             }
884     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
885     ///                 continue;
886     ///             }
887     ///             Err(e) => {
888     ///                 return Err(e.into());
889     ///             }
890     ///         }
891     ///     }
892     ///
893     ///     println!("GOT = {:?}", msg);
894     ///     Ok(())
895     /// }
896     /// ```
readable(&self) -> io::Result<()>897     pub async fn readable(&self) -> io::Result<()> {
898         self.ready(Interest::READABLE).await?;
899         Ok(())
900     }
901 
902     /// Polls for read readiness.
903     ///
904     /// If the pipe is not currently ready for reading, this method will
905     /// store a clone of the `Waker` from the provided `Context`. When the pipe
906     /// becomes ready for reading, `Waker::wake` will be called on the waker.
907     ///
908     /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
909     /// the `Waker` from the `Context` passed to the most recent call is
910     /// scheduled to receive a wakeup.
911     ///
912     /// This function is intended for cases where creating and pinning a future
913     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
914     /// preferred, as this supports polling from multiple tasks at once.
915     ///
916     /// [`readable`]: Self::readable
917     ///
918     /// # Return value
919     ///
920     /// The function returns:
921     ///
922     /// * `Poll::Pending` if the pipe is not ready for reading.
923     /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
924     /// * `Poll::Ready(Err(e))` if an error is encountered.
925     ///
926     /// # Errors
927     ///
928     /// This function may encounter any standard I/O error except `WouldBlock`.
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>929     pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
930         self.io.registration().poll_read_ready(cx).map_ok(|_| ())
931     }
932 
933     /// Tries to read data from the pipe into the provided buffer, returning how
934     /// many bytes were read.
935     ///
936     /// Reads any pending data from the pipe but does not wait for new data
937     /// to arrive. On success, returns the number of bytes read. Because
938     /// `try_read()` is non-blocking, the buffer does not have to be stored by
939     /// the async task and can exist entirely on the stack.
940     ///
941     /// Usually [`readable()`] is used with this function.
942     ///
943     /// [`readable()`]: Self::readable()
944     ///
945     /// # Return
946     ///
947     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
948     /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
949     ///
950     /// 1. The pipe's writing end is closed and will no longer write data.
951     /// 2. The specified buffer was 0 bytes in length.
952     ///
953     /// If the pipe is not ready to read data,
954     /// `Err(io::ErrorKind::WouldBlock)` is returned.
955     ///
956     /// # Examples
957     ///
958     /// ```no_run
959     /// use tokio::net::unix::pipe;
960     /// use std::io;
961     ///
962     /// #[tokio::main]
963     /// async fn main() -> io::Result<()> {
964     ///     // Open a reading end of a fifo
965     ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
966     ///
967     ///     let mut msg = vec![0; 1024];
968     ///
969     ///     loop {
970     ///         // Wait for the pipe to be readable
971     ///         rx.readable().await?;
972     ///
973     ///         // Try to read data, this may still fail with `WouldBlock`
974     ///         // if the readiness event is a false positive.
975     ///         match rx.try_read(&mut msg) {
976     ///             Ok(n) => {
977     ///                 msg.truncate(n);
978     ///                 break;
979     ///             }
980     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
981     ///                 continue;
982     ///             }
983     ///             Err(e) => {
984     ///                 return Err(e.into());
985     ///             }
986     ///         }
987     ///     }
988     ///
989     ///     println!("GOT = {:?}", msg);
990     ///     Ok(())
991     /// }
992     /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>993     pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
994         self.io
995             .registration()
996             .try_io(Interest::READABLE, || (&*self.io).read(buf))
997     }
998 
999     /// Tries to read data from the pipe into the provided buffers, returning
1000     /// how many bytes were read.
1001     ///
1002     /// Data is copied to fill each buffer in order, with the final buffer
1003     /// written to possibly being only partially filled. This method behaves
1004     /// equivalently to a single call to [`try_read()`] with concatenated
1005     /// buffers.
1006     ///
1007     /// Reads any pending data from the pipe but does not wait for new data
1008     /// to arrive. On success, returns the number of bytes read. Because
1009     /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1010     /// stored by the async task and can exist entirely on the stack.
1011     ///
1012     /// Usually, [`readable()`] is used with this function.
1013     ///
1014     /// [`try_read()`]: Self::try_read()
1015     /// [`readable()`]: Self::readable()
1016     ///
1017     /// # Return
1018     ///
1019     /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1020     /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1021     /// closed and will no longer write data. If the pipe is not ready to read
1022     /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1023     ///
1024     /// # Examples
1025     ///
1026     /// ```no_run
1027     /// use tokio::net::unix::pipe;
1028     /// use std::io;
1029     ///
1030     /// #[tokio::main]
1031     /// async fn main() -> io::Result<()> {
1032     ///     // Open a reading end of a fifo
1033     ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1034     ///
1035     ///     loop {
1036     ///         // Wait for the pipe to be readable
1037     ///         rx.readable().await?;
1038     ///
1039     ///         // Creating the buffer **after** the `await` prevents it from
1040     ///         // being stored in the async task.
1041     ///         let mut buf_a = [0; 512];
1042     ///         let mut buf_b = [0; 1024];
1043     ///         let mut bufs = [
1044     ///             io::IoSliceMut::new(&mut buf_a),
1045     ///             io::IoSliceMut::new(&mut buf_b),
1046     ///         ];
1047     ///
1048     ///         // Try to read data, this may still fail with `WouldBlock`
1049     ///         // if the readiness event is a false positive.
1050     ///         match rx.try_read_vectored(&mut bufs) {
1051     ///             Ok(0) => break,
1052     ///             Ok(n) => {
1053     ///                 println!("read {} bytes", n);
1054     ///             }
1055     ///             Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1056     ///                 continue;
1057     ///             }
1058     ///             Err(e) => {
1059     ///                 return Err(e.into());
1060     ///             }
1061     ///         }
1062     ///     }
1063     ///
1064     ///     Ok(())
1065     /// }
1066     /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>1067     pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1068         self.io
1069             .registration()
1070             .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1071     }
1072 
1073     cfg_io_util! {
1074         /// Tries to read data from the pipe into the provided buffer, advancing the
1075         /// buffer's internal cursor, returning how many bytes were read.
1076         ///
1077         /// Reads any pending data from the pipe but does not wait for new data
1078         /// to arrive. On success, returns the number of bytes read. Because
1079         /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1080         /// the async task and can exist entirely on the stack.
1081         ///
1082         /// Usually, [`readable()`] or [`ready()`] is used with this function.
1083         ///
1084         /// [`readable()`]: Self::readable
1085         /// [`ready()`]: Self::ready
1086         ///
1087         /// # Return
1088         ///
1089         /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1090         /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1091         /// closed and will no longer write data. If the pipe is not ready to read
1092         /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1093         ///
1094         /// # Examples
1095         ///
1096         /// ```no_run
1097         /// use tokio::net::unix::pipe;
1098         /// use std::io;
1099         ///
1100         /// #[tokio::main]
1101         /// async fn main() -> io::Result<()> {
1102         ///     // Open a reading end of a fifo
1103         ///     let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1104         ///
1105         ///     loop {
1106         ///         // Wait for the pipe to be readable
1107         ///         rx.readable().await?;
1108         ///
1109         ///         let mut buf = Vec::with_capacity(4096);
1110         ///
1111         ///         // Try to read data, this may still fail with `WouldBlock`
1112         ///         // if the readiness event is a false positive.
1113         ///         match rx.try_read_buf(&mut buf) {
1114         ///             Ok(0) => break,
1115         ///             Ok(n) => {
1116         ///                 println!("read {} bytes", n);
1117         ///             }
1118         ///             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1119         ///                 continue;
1120         ///             }
1121         ///             Err(e) => {
1122         ///                 return Err(e.into());
1123         ///             }
1124         ///         }
1125         ///     }
1126         ///
1127         ///     Ok(())
1128         /// }
1129         /// ```
1130         pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1131             self.io.registration().try_io(Interest::READABLE, || {
1132                 use std::io::Read;
1133 
1134                 let dst = buf.chunk_mut();
1135                 let dst =
1136                     unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1137 
1138                 // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1139                 // which correctly handles reads into uninitialized memory.
1140                 let n = (&*self.io).read(dst)?;
1141 
1142                 unsafe {
1143                     buf.advance_mut(n);
1144                 }
1145 
1146                 Ok(n)
1147             })
1148         }
1149     }
1150 }
1151 
1152 impl AsyncRead for Receiver {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1153     fn poll_read(
1154         self: Pin<&mut Self>,
1155         cx: &mut Context<'_>,
1156         buf: &mut ReadBuf<'_>,
1157     ) -> Poll<io::Result<()>> {
1158         // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1159         // which correctly handles reads into uninitialized memory.
1160         unsafe { self.io.poll_read(cx, buf) }
1161     }
1162 }
1163 
1164 impl AsRawFd for Receiver {
as_raw_fd(&self) -> RawFd1165     fn as_raw_fd(&self) -> RawFd {
1166         self.io.as_raw_fd()
1167     }
1168 }
1169 
1170 impl AsFd for Receiver {
as_fd(&self) -> BorrowedFd<'_>1171     fn as_fd(&self) -> BorrowedFd<'_> {
1172         unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1173     }
1174 }
1175 
1176 /// Checks if file is a FIFO
is_fifo(file: &File) -> io::Result<bool>1177 fn is_fifo(file: &File) -> io::Result<bool> {
1178     Ok(file.metadata()?.file_type().is_fifo())
1179 }
1180 
1181 /// Gets file descriptor's flags by fcntl.
get_file_flags(file: &File) -> io::Result<libc::c_int>1182 fn get_file_flags(file: &File) -> io::Result<libc::c_int> {
1183     let fd = file.as_raw_fd();
1184     let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
1185     if flags < 0 {
1186         Err(io::Error::last_os_error())
1187     } else {
1188         Ok(flags)
1189     }
1190 }
1191 
1192 /// Checks for O_RDONLY or O_RDWR access mode.
has_read_access(flags: libc::c_int) -> bool1193 fn has_read_access(flags: libc::c_int) -> bool {
1194     let mode = flags & libc::O_ACCMODE;
1195     mode == libc::O_RDONLY || mode == libc::O_RDWR
1196 }
1197 
1198 /// Checks for O_WRONLY or O_RDWR access mode.
has_write_access(flags: libc::c_int) -> bool1199 fn has_write_access(flags: libc::c_int) -> bool {
1200     let mode = flags & libc::O_ACCMODE;
1201     mode == libc::O_WRONLY || mode == libc::O_RDWR
1202 }
1203 
1204 /// Sets file's flags with O_NONBLOCK by fcntl.
set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()>1205 fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> {
1206     let fd = file.as_raw_fd();
1207 
1208     let flags = current_flags | libc::O_NONBLOCK;
1209 
1210     if flags != current_flags {
1211         let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) };
1212         if ret < 0 {
1213             return Err(io::Error::last_os_error());
1214         }
1215     }
1216 
1217     Ok(())
1218 }
1219