1 //! Unix pipe types.
2
3 use crate::io::interest::Interest;
4 use crate::io::{AsyncRead, AsyncWrite, PollEvented, ReadBuf, Ready};
5
6 use mio::unix::pipe as mio_pipe;
7 use std::fs::File;
8 use std::io::{self, Read, Write};
9 use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
10 use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, RawFd};
11 use std::path::Path;
12 use std::pin::Pin;
13 use std::task::{Context, Poll};
14
15 cfg_io_util! {
16 use bytes::BufMut;
17 }
18
19 /// Options and flags which can be used to configure how a FIFO file is opened.
20 ///
21 /// This builder allows configuring how to create a pipe end from a FIFO file.
22 /// Generally speaking, when using `OpenOptions`, you'll first call [`new`],
23 /// then chain calls to methods to set each option, then call either
24 /// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you
25 /// are trying to open. This will give you a [`io::Result`][result] with a pipe
26 /// end inside that you can further operate on.
27 ///
28 /// [`new`]: OpenOptions::new
29 /// [`open_receiver`]: OpenOptions::open_receiver
30 /// [`open_sender`]: OpenOptions::open_sender
31 /// [result]: std::io::Result
32 ///
33 /// # Examples
34 ///
35 /// Opening a pair of pipe ends from a FIFO file:
36 ///
37 /// ```no_run
38 /// use tokio::net::unix::pipe;
39 /// # use std::error::Error;
40 ///
41 /// const FIFO_NAME: &str = "path/to/a/fifo";
42 ///
43 /// # async fn dox() -> Result<(), Box<dyn Error>> {
44 /// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
45 /// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?;
46 /// # Ok(())
47 /// # }
48 /// ```
49 ///
50 /// Opening a [`Sender`] on Linux when you are sure the file is a FIFO:
51 ///
52 /// ```ignore
53 /// use tokio::net::unix::pipe;
54 /// use nix::{unistd::mkfifo, sys::stat::Mode};
55 /// # use std::error::Error;
56 ///
57 /// // Our program has exclusive access to this path.
58 /// const FIFO_NAME: &str = "path/to/a/new/fifo";
59 ///
60 /// # async fn dox() -> Result<(), Box<dyn Error>> {
61 /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
62 /// let tx = pipe::OpenOptions::new()
63 /// .read_write(true)
64 /// .unchecked(true)
65 /// .open_sender(FIFO_NAME)?;
66 /// # Ok(())
67 /// # }
68 /// ```
69 #[derive(Clone, Debug)]
70 pub struct OpenOptions {
71 #[cfg(target_os = "linux")]
72 read_write: bool,
73 unchecked: bool,
74 }
75
76 impl OpenOptions {
77 /// Creates a blank new set of options ready for configuration.
78 ///
79 /// All options are initially set to `false`.
new() -> OpenOptions80 pub fn new() -> OpenOptions {
81 OpenOptions {
82 #[cfg(target_os = "linux")]
83 read_write: false,
84 unchecked: false,
85 }
86 }
87
88 /// Sets the option for read-write access.
89 ///
90 /// This option, when true, will indicate that a FIFO file will be opened
91 /// in read-write access mode. This operation is not defined by the POSIX
92 /// standard and is only guaranteed to work on Linux.
93 ///
94 /// # Examples
95 ///
96 /// Opening a [`Sender`] even if there are no open reading ends:
97 ///
98 /// ```ignore
99 /// use tokio::net::unix::pipe;
100 ///
101 /// let tx = pipe::OpenOptions::new()
102 /// .read_write(true)
103 /// .open_sender("path/to/a/fifo");
104 /// ```
105 ///
106 /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not
107 /// fail with [`UnexpectedEof`] during reading if all writing ends of the
108 /// pipe close the FIFO file.
109 ///
110 /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof
111 ///
112 /// ```ignore
113 /// use tokio::net::unix::pipe;
114 ///
115 /// let tx = pipe::OpenOptions::new()
116 /// .read_write(true)
117 /// .open_receiver("path/to/a/fifo");
118 /// ```
119 #[cfg(target_os = "linux")]
120 #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))]
read_write(&mut self, value: bool) -> &mut Self121 pub fn read_write(&mut self, value: bool) -> &mut Self {
122 self.read_write = value;
123 self
124 }
125
126 /// Sets the option to skip the check for FIFO file type.
127 ///
128 /// By default, [`open_receiver`] and [`open_sender`] functions will check
129 /// if the opened file is a FIFO file. Set this option to `true` if you are
130 /// sure the file is a FIFO file.
131 ///
132 /// [`open_receiver`]: OpenOptions::open_receiver
133 /// [`open_sender`]: OpenOptions::open_sender
134 ///
135 /// # Examples
136 ///
137 /// ```no_run
138 /// use tokio::net::unix::pipe;
139 /// use nix::{unistd::mkfifo, sys::stat::Mode};
140 /// # use std::error::Error;
141 ///
142 /// // Our program has exclusive access to this path.
143 /// const FIFO_NAME: &str = "path/to/a/new/fifo";
144 ///
145 /// # async fn dox() -> Result<(), Box<dyn Error>> {
146 /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?;
147 /// let rx = pipe::OpenOptions::new()
148 /// .unchecked(true)
149 /// .open_receiver(FIFO_NAME)?;
150 /// # Ok(())
151 /// # }
152 /// ```
unchecked(&mut self, value: bool) -> &mut Self153 pub fn unchecked(&mut self, value: bool) -> &mut Self {
154 self.unchecked = value;
155 self
156 }
157
158 /// Creates a [`Receiver`] from a FIFO file with the options specified by `self`.
159 ///
160 /// This function will open the FIFO file at the specified path, possibly
161 /// check if it is a pipe, and associate the pipe with the default event
162 /// loop for reading.
163 ///
164 /// # Errors
165 ///
166 /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
167 /// This function may also fail with other standard OS errors.
168 ///
169 /// # Panics
170 ///
171 /// This function panics if it is not called from within a runtime with
172 /// IO enabled.
173 ///
174 /// The runtime is usually set implicitly when this function is called
175 /// from a future driven by a tokio runtime, otherwise runtime can be set
176 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver>177 pub fn open_receiver<P: AsRef<Path>>(&self, path: P) -> io::Result<Receiver> {
178 let file = self.open(path.as_ref(), PipeEnd::Receiver)?;
179 Receiver::from_file_unchecked(file)
180 }
181
182 /// Creates a [`Sender`] from a FIFO file with the options specified by `self`.
183 ///
184 /// This function will open the FIFO file at the specified path, possibly
185 /// check if it is a pipe, and associate the pipe with the default event
186 /// loop for writing.
187 ///
188 /// # Errors
189 ///
190 /// If the file type check fails, this function will fail with `io::ErrorKind::InvalidInput`.
191 /// If the file is not opened in read-write access mode and the file is not
192 /// currently open for reading, this function will fail with `ENXIO`.
193 /// This function may also fail with other standard OS errors.
194 ///
195 /// # Panics
196 ///
197 /// This function panics if it is not called from within a runtime with
198 /// IO enabled.
199 ///
200 /// The runtime is usually set implicitly when this function is called
201 /// from a future driven by a tokio runtime, otherwise runtime can be set
202 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender>203 pub fn open_sender<P: AsRef<Path>>(&self, path: P) -> io::Result<Sender> {
204 let file = self.open(path.as_ref(), PipeEnd::Sender)?;
205 Sender::from_file_unchecked(file)
206 }
207
open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File>208 fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result<File> {
209 let mut options = std::fs::OpenOptions::new();
210 options
211 .read(pipe_end == PipeEnd::Receiver)
212 .write(pipe_end == PipeEnd::Sender)
213 .custom_flags(libc::O_NONBLOCK);
214
215 #[cfg(target_os = "linux")]
216 if self.read_write {
217 options.read(true).write(true);
218 }
219
220 let file = options.open(path)?;
221
222 if !self.unchecked && !is_fifo(&file)? {
223 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
224 }
225
226 Ok(file)
227 }
228 }
229
230 impl Default for OpenOptions {
default() -> OpenOptions231 fn default() -> OpenOptions {
232 OpenOptions::new()
233 }
234 }
235
236 #[derive(Clone, Copy, PartialEq, Eq, Debug)]
237 enum PipeEnd {
238 Sender,
239 Receiver,
240 }
241
242 /// Writing end of a Unix pipe.
243 ///
244 /// It can be constructed from a FIFO file with [`OpenOptions::open_sender`].
245 ///
246 /// Opening a named pipe for writing involves a few steps.
247 /// Call to [`OpenOptions::open_sender`] might fail with an error indicating
248 /// different things:
249 ///
250 /// * [`io::ErrorKind::NotFound`] - There is no file at the specified path.
251 /// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO.
252 /// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading.
253 /// Sleep for a while and try again.
254 /// * Other OS errors not specific to opening FIFO files.
255 ///
256 /// Opening a `Sender` from a FIFO file should look like this:
257 ///
258 /// ```no_run
259 /// use tokio::net::unix::pipe;
260 /// use tokio::time::{self, Duration};
261 ///
262 /// const FIFO_NAME: &str = "path/to/a/fifo";
263 ///
264 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
265 /// // Wait for a reader to open the file.
266 /// let tx = loop {
267 /// match pipe::OpenOptions::new().open_sender(FIFO_NAME) {
268 /// Ok(tx) => break tx,
269 /// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {},
270 /// Err(e) => return Err(e.into()),
271 /// }
272 ///
273 /// time::sleep(Duration::from_millis(50)).await;
274 /// };
275 /// # Ok(())
276 /// # }
277 /// ```
278 ///
279 /// On Linux, it is possible to create a `Sender` without waiting in a sleeping
280 /// loop. This is done by opening a named pipe in read-write access mode with
281 /// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold
282 /// both a writing end and a reading end, and the latter allows to open a FIFO
283 /// without [`ENXIO`] error since the pipe is open for reading as well.
284 ///
285 /// `Sender` cannot be used to read from a pipe, so in practice the read access
286 /// is only used when a FIFO is opened. However, using a `Sender` in read-write
287 /// mode **may lead to lost data**, because written data will be dropped by the
288 /// system as soon as all pipe ends are closed. To avoid lost data you have to
289 /// make sure that a reading end has been opened before dropping a `Sender`.
290 ///
291 /// Note that using read-write access mode with FIFO files is not defined by
292 /// the POSIX standard and it is only guaranteed to work on Linux.
293 ///
294 /// ```ignore
295 /// use tokio::io::AsyncWriteExt;
296 /// use tokio::net::unix::pipe;
297 ///
298 /// const FIFO_NAME: &str = "path/to/a/fifo";
299 ///
300 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
301 /// let mut tx = pipe::OpenOptions::new()
302 /// .read_write(true)
303 /// .open_sender(FIFO_NAME)?;
304 ///
305 /// // Asynchronously write to the pipe before a reader.
306 /// tx.write_all(b"hello world").await?;
307 /// # Ok(())
308 /// # }
309 /// ```
310 ///
311 /// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html
312 #[derive(Debug)]
313 pub struct Sender {
314 io: PollEvented<mio_pipe::Sender>,
315 }
316
317 impl Sender {
from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender>318 fn from_mio(mio_tx: mio_pipe::Sender) -> io::Result<Sender> {
319 let io = PollEvented::new_with_interest(mio_tx, Interest::WRITABLE)?;
320 Ok(Sender { io })
321 }
322
323 /// Creates a new `Sender` from a [`File`].
324 ///
325 /// This function is intended to construct a pipe from a [`File`] representing
326 /// a special FIFO file. It will check if the file is a pipe and has write access,
327 /// set it in non-blocking mode and perform the conversion.
328 ///
329 /// # Errors
330 ///
331 /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
332 /// does not have write access. Also fails with any standard OS error if it occurs.
333 ///
334 /// # Panics
335 ///
336 /// This function panics if it is not called from within a runtime with
337 /// IO enabled.
338 ///
339 /// The runtime is usually set implicitly when this function is called
340 /// from a future driven by a tokio runtime, otherwise runtime can be set
341 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file(mut file: File) -> io::Result<Sender>342 pub fn from_file(mut file: File) -> io::Result<Sender> {
343 if !is_fifo(&file)? {
344 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
345 }
346
347 let flags = get_file_flags(&file)?;
348 if has_write_access(flags) {
349 set_nonblocking(&mut file, flags)?;
350 Sender::from_file_unchecked(file)
351 } else {
352 Err(io::Error::new(
353 io::ErrorKind::InvalidInput,
354 "not in O_WRONLY or O_RDWR access mode",
355 ))
356 }
357 }
358
359 /// Creates a new `Sender` from a [`File`] without checking pipe properties.
360 ///
361 /// This function is intended to construct a pipe from a File representing
362 /// a special FIFO file. The conversion assumes nothing about the underlying
363 /// file; it is left up to the user to make sure it is opened with write access,
364 /// represents a pipe and is set in non-blocking mode.
365 ///
366 /// # Examples
367 ///
368 /// ```no_run
369 /// use tokio::net::unix::pipe;
370 /// use std::fs::OpenOptions;
371 /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
372 /// # use std::error::Error;
373 ///
374 /// const FIFO_NAME: &str = "path/to/a/fifo";
375 ///
376 /// # async fn dox() -> Result<(), Box<dyn Error>> {
377 /// let file = OpenOptions::new()
378 /// .write(true)
379 /// .custom_flags(libc::O_NONBLOCK)
380 /// .open(FIFO_NAME)?;
381 /// if file.metadata()?.file_type().is_fifo() {
382 /// let tx = pipe::Sender::from_file_unchecked(file)?;
383 /// /* use the Sender */
384 /// }
385 /// # Ok(())
386 /// # }
387 /// ```
388 ///
389 /// # Panics
390 ///
391 /// This function panics if it is not called from within a runtime with
392 /// IO enabled.
393 ///
394 /// The runtime is usually set implicitly when this function is called
395 /// from a future driven by a tokio runtime, otherwise runtime can be set
396 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file_unchecked(file: File) -> io::Result<Sender>397 pub fn from_file_unchecked(file: File) -> io::Result<Sender> {
398 let raw_fd = file.into_raw_fd();
399 let mio_tx = unsafe { mio_pipe::Sender::from_raw_fd(raw_fd) };
400 Sender::from_mio(mio_tx)
401 }
402
403 /// Waits for any of the requested ready states.
404 ///
405 /// This function can be used instead of [`writable()`] to check the returned
406 /// ready set for [`Ready::WRITABLE`] and [`Ready::WRITE_CLOSED`] events.
407 ///
408 /// The function may complete without the pipe being ready. This is a
409 /// false-positive and attempting an operation will return with
410 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
411 /// [`Ready`] set, so you should always check the returned value and possibly
412 /// wait again if the requested states are not set.
413 ///
414 /// [`writable()`]: Self::writable
415 ///
416 /// # Cancel safety
417 ///
418 /// This method is cancel safe. Once a readiness event occurs, the method
419 /// will continue to return immediately until the readiness event is
420 /// consumed by an attempt to write that fails with `WouldBlock` or
421 /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>422 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
423 let event = self.io.registration().readiness(interest).await?;
424 Ok(event.ready)
425 }
426
427 /// Waits for the pipe to become writable.
428 ///
429 /// This function is equivalent to `ready(Interest::WRITABLE)` and is usually
430 /// paired with [`try_write()`].
431 ///
432 /// [`try_write()`]: Self::try_write
433 ///
434 /// # Examples
435 ///
436 /// ```no_run
437 /// use tokio::net::unix::pipe;
438 /// use std::io;
439 ///
440 /// #[tokio::main]
441 /// async fn main() -> io::Result<()> {
442 /// // Open a writing end of a fifo
443 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
444 ///
445 /// loop {
446 /// // Wait for the pipe to be writable
447 /// tx.writable().await?;
448 ///
449 /// // Try to write data, this may still fail with `WouldBlock`
450 /// // if the readiness event is a false positive.
451 /// match tx.try_write(b"hello world") {
452 /// Ok(n) => {
453 /// break;
454 /// }
455 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
456 /// continue;
457 /// }
458 /// Err(e) => {
459 /// return Err(e.into());
460 /// }
461 /// }
462 /// }
463 ///
464 /// Ok(())
465 /// }
466 /// ```
writable(&self) -> io::Result<()>467 pub async fn writable(&self) -> io::Result<()> {
468 self.ready(Interest::WRITABLE).await?;
469 Ok(())
470 }
471
472 /// Polls for write readiness.
473 ///
474 /// If the pipe is not currently ready for writing, this method will
475 /// store a clone of the `Waker` from the provided `Context`. When the pipe
476 /// becomes ready for writing, `Waker::wake` will be called on the waker.
477 ///
478 /// Note that on multiple calls to `poll_write_ready` or `poll_write`, only
479 /// the `Waker` from the `Context` passed to the most recent call is
480 /// scheduled to receive a wakeup.
481 ///
482 /// This function is intended for cases where creating and pinning a future
483 /// via [`writable`] is not feasible. Where possible, using [`writable`] is
484 /// preferred, as this supports polling from multiple tasks at once.
485 ///
486 /// [`writable`]: Self::writable
487 ///
488 /// # Return value
489 ///
490 /// The function returns:
491 ///
492 /// * `Poll::Pending` if the pipe is not ready for writing.
493 /// * `Poll::Ready(Ok(()))` if the pipe is ready for writing.
494 /// * `Poll::Ready(Err(e))` if an error is encountered.
495 ///
496 /// # Errors
497 ///
498 /// This function may encounter any standard I/O error except `WouldBlock`.
poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>499 pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
500 self.io.registration().poll_write_ready(cx).map_ok(|_| ())
501 }
502
503 /// Tries to write a buffer to the pipe, returning how many bytes were
504 /// written.
505 ///
506 /// The function will attempt to write the entire contents of `buf`, but
507 /// only part of the buffer may be written. If the length of `buf` is not
508 /// greater than `PIPE_BUF` (an OS constant, 4096 under Linux), then the
509 /// write is guaranteed to be atomic, i.e. either the entire content of
510 /// `buf` will be written or this method will fail with `WouldBlock`. There
511 /// is no such guarantee if `buf` is larger than `PIPE_BUF`.
512 ///
513 /// This function is usually paired with [`writable`].
514 ///
515 /// [`writable`]: Self::writable
516 ///
517 /// # Return
518 ///
519 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
520 /// number of bytes written. If the pipe is not ready to write data,
521 /// `Err(io::ErrorKind::WouldBlock)` is returned.
522 ///
523 /// # Examples
524 ///
525 /// ```no_run
526 /// use tokio::net::unix::pipe;
527 /// use std::io;
528 ///
529 /// #[tokio::main]
530 /// async fn main() -> io::Result<()> {
531 /// // Open a writing end of a fifo
532 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
533 ///
534 /// loop {
535 /// // Wait for the pipe to be writable
536 /// tx.writable().await?;
537 ///
538 /// // Try to write data, this may still fail with `WouldBlock`
539 /// // if the readiness event is a false positive.
540 /// match tx.try_write(b"hello world") {
541 /// Ok(n) => {
542 /// break;
543 /// }
544 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
545 /// continue;
546 /// }
547 /// Err(e) => {
548 /// return Err(e.into());
549 /// }
550 /// }
551 /// }
552 ///
553 /// Ok(())
554 /// }
555 /// ```
try_write(&self, buf: &[u8]) -> io::Result<usize>556 pub fn try_write(&self, buf: &[u8]) -> io::Result<usize> {
557 self.io
558 .registration()
559 .try_io(Interest::WRITABLE, || (&*self.io).write(buf))
560 }
561
562 /// Tries to write several buffers to the pipe, returning how many bytes
563 /// were written.
564 ///
565 /// Data is written from each buffer in order, with the final buffer read
566 /// from possible being only partially consumed. This method behaves
567 /// equivalently to a single call to [`try_write()`] with concatenated
568 /// buffers.
569 ///
570 /// If the total length of buffers is not greater than `PIPE_BUF` (an OS
571 /// constant, 4096 under Linux), then the write is guaranteed to be atomic,
572 /// i.e. either the entire contents of buffers will be written or this
573 /// method will fail with `WouldBlock`. There is no such guarantee if the
574 /// total length of buffers is greater than `PIPE_BUF`.
575 ///
576 /// This function is usually paired with [`writable`].
577 ///
578 /// [`try_write()`]: Self::try_write()
579 /// [`writable`]: Self::writable
580 ///
581 /// # Return
582 ///
583 /// If data is successfully written, `Ok(n)` is returned, where `n` is the
584 /// number of bytes written. If the pipe is not ready to write data,
585 /// `Err(io::ErrorKind::WouldBlock)` is returned.
586 ///
587 /// # Examples
588 ///
589 /// ```no_run
590 /// use tokio::net::unix::pipe;
591 /// use std::io;
592 ///
593 /// #[tokio::main]
594 /// async fn main() -> io::Result<()> {
595 /// // Open a writing end of a fifo
596 /// let tx = pipe::OpenOptions::new().open_sender("path/to/a/fifo")?;
597 ///
598 /// let bufs = [io::IoSlice::new(b"hello "), io::IoSlice::new(b"world")];
599 ///
600 /// loop {
601 /// // Wait for the pipe to be writable
602 /// tx.writable().await?;
603 ///
604 /// // Try to write data, this may still fail with `WouldBlock`
605 /// // if the readiness event is a false positive.
606 /// match tx.try_write_vectored(&bufs) {
607 /// Ok(n) => {
608 /// break;
609 /// }
610 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
611 /// continue;
612 /// }
613 /// Err(e) => {
614 /// return Err(e.into());
615 /// }
616 /// }
617 /// }
618 ///
619 /// Ok(())
620 /// }
621 /// ```
try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize>622 pub fn try_write_vectored(&self, buf: &[io::IoSlice<'_>]) -> io::Result<usize> {
623 self.io
624 .registration()
625 .try_io(Interest::WRITABLE, || (&*self.io).write_vectored(buf))
626 }
627 }
628
629 impl AsyncWrite for Sender {
poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<io::Result<usize>>630 fn poll_write(
631 self: Pin<&mut Self>,
632 cx: &mut Context<'_>,
633 buf: &[u8],
634 ) -> Poll<io::Result<usize>> {
635 self.io.poll_write(cx, buf)
636 }
637
poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[io::IoSlice<'_>], ) -> Poll<io::Result<usize>>638 fn poll_write_vectored(
639 self: Pin<&mut Self>,
640 cx: &mut Context<'_>,
641 bufs: &[io::IoSlice<'_>],
642 ) -> Poll<io::Result<usize>> {
643 self.io.poll_write_vectored(cx, bufs)
644 }
645
is_write_vectored(&self) -> bool646 fn is_write_vectored(&self) -> bool {
647 true
648 }
649
poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>650 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
651 Poll::Ready(Ok(()))
652 }
653
poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>>654 fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
655 Poll::Ready(Ok(()))
656 }
657 }
658
659 impl AsRawFd for Sender {
as_raw_fd(&self) -> RawFd660 fn as_raw_fd(&self) -> RawFd {
661 self.io.as_raw_fd()
662 }
663 }
664
665 impl AsFd for Sender {
as_fd(&self) -> BorrowedFd<'_>666 fn as_fd(&self) -> BorrowedFd<'_> {
667 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
668 }
669 }
670
671 /// Reading end of a Unix pipe.
672 ///
673 /// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`].
674 ///
675 /// # Examples
676 ///
677 /// Receiving messages from a named pipe in a loop:
678 ///
679 /// ```no_run
680 /// use tokio::net::unix::pipe;
681 /// use tokio::io::{self, AsyncReadExt};
682 ///
683 /// const FIFO_NAME: &str = "path/to/a/fifo";
684 ///
685 /// # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
686 /// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
687 /// loop {
688 /// let mut msg = vec![0; 256];
689 /// match rx.read_exact(&mut msg).await {
690 /// Ok(_) => {
691 /// /* handle the message */
692 /// }
693 /// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
694 /// // Writing end has been closed, we should reopen the pipe.
695 /// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?;
696 /// }
697 /// Err(e) => return Err(e.into()),
698 /// }
699 /// }
700 /// # }
701 /// ```
702 ///
703 /// On Linux, you can use a `Receiver` in read-write access mode to implement
704 /// resilient reading from a named pipe. Unlike `Receiver` opened in read-only
705 /// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof`
706 /// when the writing end is closed. This way, a `Receiver` can asynchronously
707 /// wait for the next writer to open the pipe.
708 ///
709 /// You should not use functions waiting for EOF such as [`read_to_end`] with
710 /// a `Receiver` in read-write access mode, since it **may wait forever**.
711 /// `Receiver` in this mode also holds an open writing end, which prevents
712 /// receiving EOF.
713 ///
714 /// To set the read-write access mode you can use `OpenOptions::read_write`.
715 /// Note that using read-write access mode with FIFO files is not defined by
716 /// the POSIX standard and it is only guaranteed to work on Linux.
717 ///
718 /// ```ignore
719 /// use tokio::net::unix::pipe;
720 /// use tokio::io::AsyncReadExt;
721 /// # use std::error::Error;
722 ///
723 /// const FIFO_NAME: &str = "path/to/a/fifo";
724 ///
725 /// # async fn dox() -> Result<(), Box<dyn Error>> {
726 /// let mut rx = pipe::OpenOptions::new()
727 /// .read_write(true)
728 /// .open_receiver(FIFO_NAME)?;
729 /// loop {
730 /// let mut msg = vec![0; 256];
731 /// rx.read_exact(&mut msg).await?;
732 /// /* handle the message */
733 /// }
734 /// # }
735 /// ```
736 ///
737 /// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end
738 #[derive(Debug)]
739 pub struct Receiver {
740 io: PollEvented<mio_pipe::Receiver>,
741 }
742
743 impl Receiver {
from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver>744 fn from_mio(mio_rx: mio_pipe::Receiver) -> io::Result<Receiver> {
745 let io = PollEvented::new_with_interest(mio_rx, Interest::READABLE)?;
746 Ok(Receiver { io })
747 }
748
749 /// Creates a new `Receiver` from a [`File`].
750 ///
751 /// This function is intended to construct a pipe from a [`File`] representing
752 /// a special FIFO file. It will check if the file is a pipe and has read access,
753 /// set it in non-blocking mode and perform the conversion.
754 ///
755 /// # Errors
756 ///
757 /// Fails with `io::ErrorKind::InvalidInput` if the file is not a pipe or it
758 /// does not have read access. Also fails with any standard OS error if it occurs.
759 ///
760 /// # Panics
761 ///
762 /// This function panics if it is not called from within a runtime with
763 /// IO enabled.
764 ///
765 /// The runtime is usually set implicitly when this function is called
766 /// from a future driven by a tokio runtime, otherwise runtime can be set
767 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file(mut file: File) -> io::Result<Receiver>768 pub fn from_file(mut file: File) -> io::Result<Receiver> {
769 if !is_fifo(&file)? {
770 return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe"));
771 }
772
773 let flags = get_file_flags(&file)?;
774 if has_read_access(flags) {
775 set_nonblocking(&mut file, flags)?;
776 Receiver::from_file_unchecked(file)
777 } else {
778 Err(io::Error::new(
779 io::ErrorKind::InvalidInput,
780 "not in O_RDONLY or O_RDWR access mode",
781 ))
782 }
783 }
784
785 /// Creates a new `Receiver` from a [`File`] without checking pipe properties.
786 ///
787 /// This function is intended to construct a pipe from a File representing
788 /// a special FIFO file. The conversion assumes nothing about the underlying
789 /// file; it is left up to the user to make sure it is opened with read access,
790 /// represents a pipe and is set in non-blocking mode.
791 ///
792 /// # Examples
793 ///
794 /// ```no_run
795 /// use tokio::net::unix::pipe;
796 /// use std::fs::OpenOptions;
797 /// use std::os::unix::fs::{FileTypeExt, OpenOptionsExt};
798 /// # use std::error::Error;
799 ///
800 /// const FIFO_NAME: &str = "path/to/a/fifo";
801 ///
802 /// # async fn dox() -> Result<(), Box<dyn Error>> {
803 /// let file = OpenOptions::new()
804 /// .read(true)
805 /// .custom_flags(libc::O_NONBLOCK)
806 /// .open(FIFO_NAME)?;
807 /// if file.metadata()?.file_type().is_fifo() {
808 /// let rx = pipe::Receiver::from_file_unchecked(file)?;
809 /// /* use the Receiver */
810 /// }
811 /// # Ok(())
812 /// # }
813 /// ```
814 ///
815 /// # Panics
816 ///
817 /// This function panics if it is not called from within a runtime with
818 /// IO enabled.
819 ///
820 /// The runtime is usually set implicitly when this function is called
821 /// from a future driven by a tokio runtime, otherwise runtime can be set
822 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
from_file_unchecked(file: File) -> io::Result<Receiver>823 pub fn from_file_unchecked(file: File) -> io::Result<Receiver> {
824 let raw_fd = file.into_raw_fd();
825 let mio_rx = unsafe { mio_pipe::Receiver::from_raw_fd(raw_fd) };
826 Receiver::from_mio(mio_rx)
827 }
828
829 /// Waits for any of the requested ready states.
830 ///
831 /// This function can be used instead of [`readable()`] to check the returned
832 /// ready set for [`Ready::READABLE`] and [`Ready::READ_CLOSED`] events.
833 ///
834 /// The function may complete without the pipe being ready. This is a
835 /// false-positive and attempting an operation will return with
836 /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
837 /// [`Ready`] set, so you should always check the returned value and possibly
838 /// wait again if the requested states are not set.
839 ///
840 /// [`readable()`]: Self::readable
841 ///
842 /// # Cancel safety
843 ///
844 /// This method is cancel safe. Once a readiness event occurs, the method
845 /// will continue to return immediately until the readiness event is
846 /// consumed by an attempt to read that fails with `WouldBlock` or
847 /// `Poll::Pending`.
ready(&self, interest: Interest) -> io::Result<Ready>848 pub async fn ready(&self, interest: Interest) -> io::Result<Ready> {
849 let event = self.io.registration().readiness(interest).await?;
850 Ok(event.ready)
851 }
852
853 /// Waits for the pipe to become readable.
854 ///
855 /// This function is equivalent to `ready(Interest::READABLE)` and is usually
856 /// paired with [`try_read()`].
857 ///
858 /// [`try_read()`]: Self::try_read()
859 ///
860 /// # Examples
861 ///
862 /// ```no_run
863 /// use tokio::net::unix::pipe;
864 /// use std::io;
865 ///
866 /// #[tokio::main]
867 /// async fn main() -> io::Result<()> {
868 /// // Open a reading end of a fifo
869 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
870 ///
871 /// let mut msg = vec![0; 1024];
872 ///
873 /// loop {
874 /// // Wait for the pipe to be readable
875 /// rx.readable().await?;
876 ///
877 /// // Try to read data, this may still fail with `WouldBlock`
878 /// // if the readiness event is a false positive.
879 /// match rx.try_read(&mut msg) {
880 /// Ok(n) => {
881 /// msg.truncate(n);
882 /// break;
883 /// }
884 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
885 /// continue;
886 /// }
887 /// Err(e) => {
888 /// return Err(e.into());
889 /// }
890 /// }
891 /// }
892 ///
893 /// println!("GOT = {:?}", msg);
894 /// Ok(())
895 /// }
896 /// ```
readable(&self) -> io::Result<()>897 pub async fn readable(&self) -> io::Result<()> {
898 self.ready(Interest::READABLE).await?;
899 Ok(())
900 }
901
902 /// Polls for read readiness.
903 ///
904 /// If the pipe is not currently ready for reading, this method will
905 /// store a clone of the `Waker` from the provided `Context`. When the pipe
906 /// becomes ready for reading, `Waker::wake` will be called on the waker.
907 ///
908 /// Note that on multiple calls to `poll_read_ready` or `poll_read`, only
909 /// the `Waker` from the `Context` passed to the most recent call is
910 /// scheduled to receive a wakeup.
911 ///
912 /// This function is intended for cases where creating and pinning a future
913 /// via [`readable`] is not feasible. Where possible, using [`readable`] is
914 /// preferred, as this supports polling from multiple tasks at once.
915 ///
916 /// [`readable`]: Self::readable
917 ///
918 /// # Return value
919 ///
920 /// The function returns:
921 ///
922 /// * `Poll::Pending` if the pipe is not ready for reading.
923 /// * `Poll::Ready(Ok(()))` if the pipe is ready for reading.
924 /// * `Poll::Ready(Err(e))` if an error is encountered.
925 ///
926 /// # Errors
927 ///
928 /// This function may encounter any standard I/O error except `WouldBlock`.
poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>929 pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
930 self.io.registration().poll_read_ready(cx).map_ok(|_| ())
931 }
932
933 /// Tries to read data from the pipe into the provided buffer, returning how
934 /// many bytes were read.
935 ///
936 /// Reads any pending data from the pipe but does not wait for new data
937 /// to arrive. On success, returns the number of bytes read. Because
938 /// `try_read()` is non-blocking, the buffer does not have to be stored by
939 /// the async task and can exist entirely on the stack.
940 ///
941 /// Usually [`readable()`] is used with this function.
942 ///
943 /// [`readable()`]: Self::readable()
944 ///
945 /// # Return
946 ///
947 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
948 /// number of bytes read. If `n` is `0`, then it can indicate one of two scenarios:
949 ///
950 /// 1. The pipe's writing end is closed and will no longer write data.
951 /// 2. The specified buffer was 0 bytes in length.
952 ///
953 /// If the pipe is not ready to read data,
954 /// `Err(io::ErrorKind::WouldBlock)` is returned.
955 ///
956 /// # Examples
957 ///
958 /// ```no_run
959 /// use tokio::net::unix::pipe;
960 /// use std::io;
961 ///
962 /// #[tokio::main]
963 /// async fn main() -> io::Result<()> {
964 /// // Open a reading end of a fifo
965 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
966 ///
967 /// let mut msg = vec![0; 1024];
968 ///
969 /// loop {
970 /// // Wait for the pipe to be readable
971 /// rx.readable().await?;
972 ///
973 /// // Try to read data, this may still fail with `WouldBlock`
974 /// // if the readiness event is a false positive.
975 /// match rx.try_read(&mut msg) {
976 /// Ok(n) => {
977 /// msg.truncate(n);
978 /// break;
979 /// }
980 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
981 /// continue;
982 /// }
983 /// Err(e) => {
984 /// return Err(e.into());
985 /// }
986 /// }
987 /// }
988 ///
989 /// println!("GOT = {:?}", msg);
990 /// Ok(())
991 /// }
992 /// ```
try_read(&self, buf: &mut [u8]) -> io::Result<usize>993 pub fn try_read(&self, buf: &mut [u8]) -> io::Result<usize> {
994 self.io
995 .registration()
996 .try_io(Interest::READABLE, || (&*self.io).read(buf))
997 }
998
999 /// Tries to read data from the pipe into the provided buffers, returning
1000 /// how many bytes were read.
1001 ///
1002 /// Data is copied to fill each buffer in order, with the final buffer
1003 /// written to possibly being only partially filled. This method behaves
1004 /// equivalently to a single call to [`try_read()`] with concatenated
1005 /// buffers.
1006 ///
1007 /// Reads any pending data from the pipe but does not wait for new data
1008 /// to arrive. On success, returns the number of bytes read. Because
1009 /// `try_read_vectored()` is non-blocking, the buffer does not have to be
1010 /// stored by the async task and can exist entirely on the stack.
1011 ///
1012 /// Usually, [`readable()`] is used with this function.
1013 ///
1014 /// [`try_read()`]: Self::try_read()
1015 /// [`readable()`]: Self::readable()
1016 ///
1017 /// # Return
1018 ///
1019 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1020 /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1021 /// closed and will no longer write data. If the pipe is not ready to read
1022 /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1023 ///
1024 /// # Examples
1025 ///
1026 /// ```no_run
1027 /// use tokio::net::unix::pipe;
1028 /// use std::io;
1029 ///
1030 /// #[tokio::main]
1031 /// async fn main() -> io::Result<()> {
1032 /// // Open a reading end of a fifo
1033 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1034 ///
1035 /// loop {
1036 /// // Wait for the pipe to be readable
1037 /// rx.readable().await?;
1038 ///
1039 /// // Creating the buffer **after** the `await` prevents it from
1040 /// // being stored in the async task.
1041 /// let mut buf_a = [0; 512];
1042 /// let mut buf_b = [0; 1024];
1043 /// let mut bufs = [
1044 /// io::IoSliceMut::new(&mut buf_a),
1045 /// io::IoSliceMut::new(&mut buf_b),
1046 /// ];
1047 ///
1048 /// // Try to read data, this may still fail with `WouldBlock`
1049 /// // if the readiness event is a false positive.
1050 /// match rx.try_read_vectored(&mut bufs) {
1051 /// Ok(0) => break,
1052 /// Ok(n) => {
1053 /// println!("read {} bytes", n);
1054 /// }
1055 /// Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
1056 /// continue;
1057 /// }
1058 /// Err(e) => {
1059 /// return Err(e.into());
1060 /// }
1061 /// }
1062 /// }
1063 ///
1064 /// Ok(())
1065 /// }
1066 /// ```
try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize>1067 pub fn try_read_vectored(&self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result<usize> {
1068 self.io
1069 .registration()
1070 .try_io(Interest::READABLE, || (&*self.io).read_vectored(bufs))
1071 }
1072
1073 cfg_io_util! {
1074 /// Tries to read data from the pipe into the provided buffer, advancing the
1075 /// buffer's internal cursor, returning how many bytes were read.
1076 ///
1077 /// Reads any pending data from the pipe but does not wait for new data
1078 /// to arrive. On success, returns the number of bytes read. Because
1079 /// `try_read_buf()` is non-blocking, the buffer does not have to be stored by
1080 /// the async task and can exist entirely on the stack.
1081 ///
1082 /// Usually, [`readable()`] or [`ready()`] is used with this function.
1083 ///
1084 /// [`readable()`]: Self::readable
1085 /// [`ready()`]: Self::ready
1086 ///
1087 /// # Return
1088 ///
1089 /// If data is successfully read, `Ok(n)` is returned, where `n` is the
1090 /// number of bytes read. `Ok(0)` indicates the pipe's writing end is
1091 /// closed and will no longer write data. If the pipe is not ready to read
1092 /// data `Err(io::ErrorKind::WouldBlock)` is returned.
1093 ///
1094 /// # Examples
1095 ///
1096 /// ```no_run
1097 /// use tokio::net::unix::pipe;
1098 /// use std::io;
1099 ///
1100 /// #[tokio::main]
1101 /// async fn main() -> io::Result<()> {
1102 /// // Open a reading end of a fifo
1103 /// let rx = pipe::OpenOptions::new().open_receiver("path/to/a/fifo")?;
1104 ///
1105 /// loop {
1106 /// // Wait for the pipe to be readable
1107 /// rx.readable().await?;
1108 ///
1109 /// let mut buf = Vec::with_capacity(4096);
1110 ///
1111 /// // Try to read data, this may still fail with `WouldBlock`
1112 /// // if the readiness event is a false positive.
1113 /// match rx.try_read_buf(&mut buf) {
1114 /// Ok(0) => break,
1115 /// Ok(n) => {
1116 /// println!("read {} bytes", n);
1117 /// }
1118 /// Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1119 /// continue;
1120 /// }
1121 /// Err(e) => {
1122 /// return Err(e.into());
1123 /// }
1124 /// }
1125 /// }
1126 ///
1127 /// Ok(())
1128 /// }
1129 /// ```
1130 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> io::Result<usize> {
1131 self.io.registration().try_io(Interest::READABLE, || {
1132 use std::io::Read;
1133
1134 let dst = buf.chunk_mut();
1135 let dst =
1136 unsafe { &mut *(dst as *mut _ as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]) };
1137
1138 // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1139 // which correctly handles reads into uninitialized memory.
1140 let n = (&*self.io).read(dst)?;
1141
1142 unsafe {
1143 buf.advance_mut(n);
1144 }
1145
1146 Ok(n)
1147 })
1148 }
1149 }
1150 }
1151
1152 impl AsyncRead for Receiver {
poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<io::Result<()>>1153 fn poll_read(
1154 self: Pin<&mut Self>,
1155 cx: &mut Context<'_>,
1156 buf: &mut ReadBuf<'_>,
1157 ) -> Poll<io::Result<()>> {
1158 // Safety: `mio_pipe::Receiver` uses a `std::fs::File` underneath,
1159 // which correctly handles reads into uninitialized memory.
1160 unsafe { self.io.poll_read(cx, buf) }
1161 }
1162 }
1163
1164 impl AsRawFd for Receiver {
as_raw_fd(&self) -> RawFd1165 fn as_raw_fd(&self) -> RawFd {
1166 self.io.as_raw_fd()
1167 }
1168 }
1169
1170 impl AsFd for Receiver {
as_fd(&self) -> BorrowedFd<'_>1171 fn as_fd(&self) -> BorrowedFd<'_> {
1172 unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }
1173 }
1174 }
1175
1176 /// Checks if file is a FIFO
is_fifo(file: &File) -> io::Result<bool>1177 fn is_fifo(file: &File) -> io::Result<bool> {
1178 Ok(file.metadata()?.file_type().is_fifo())
1179 }
1180
1181 /// Gets file descriptor's flags by fcntl.
get_file_flags(file: &File) -> io::Result<libc::c_int>1182 fn get_file_flags(file: &File) -> io::Result<libc::c_int> {
1183 let fd = file.as_raw_fd();
1184 let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
1185 if flags < 0 {
1186 Err(io::Error::last_os_error())
1187 } else {
1188 Ok(flags)
1189 }
1190 }
1191
1192 /// Checks for O_RDONLY or O_RDWR access mode.
has_read_access(flags: libc::c_int) -> bool1193 fn has_read_access(flags: libc::c_int) -> bool {
1194 let mode = flags & libc::O_ACCMODE;
1195 mode == libc::O_RDONLY || mode == libc::O_RDWR
1196 }
1197
1198 /// Checks for O_WRONLY or O_RDWR access mode.
has_write_access(flags: libc::c_int) -> bool1199 fn has_write_access(flags: libc::c_int) -> bool {
1200 let mode = flags & libc::O_ACCMODE;
1201 mode == libc::O_WRONLY || mode == libc::O_RDWR
1202 }
1203
1204 /// Sets file's flags with O_NONBLOCK by fcntl.
set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()>1205 fn set_nonblocking(file: &mut File, current_flags: libc::c_int) -> io::Result<()> {
1206 let fd = file.as_raw_fd();
1207
1208 let flags = current_flags | libc::O_NONBLOCK;
1209
1210 if flags != current_flags {
1211 let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags) };
1212 if ret < 0 {
1213 return Err(io::Error::last_os_error());
1214 }
1215 }
1216
1217 Ok(())
1218 }
1219