• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg(feature = "full")]
2 #![cfg(unix)]
3 
4 use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest};
5 use tokio::net::unix::pipe;
6 use tokio_test::task;
7 use tokio_test::{assert_err, assert_ok, assert_pending, assert_ready_ok};
8 
9 use std::fs::File;
10 use std::io;
11 use std::os::unix::fs::OpenOptionsExt;
12 use std::os::unix::io::AsRawFd;
13 use std::path::{Path, PathBuf};
14 
15 /// Helper struct which will clean up temporary files once dropped.
16 struct TempFifo {
17     path: PathBuf,
18     _dir: tempfile::TempDir,
19 }
20 
21 impl TempFifo {
new(name: &str) -> io::Result<TempFifo>22     fn new(name: &str) -> io::Result<TempFifo> {
23         let dir = tempfile::Builder::new()
24             .prefix("tokio-fifo-tests")
25             .tempdir()?;
26         let path = dir.path().join(name);
27         nix::unistd::mkfifo(&path, nix::sys::stat::Mode::S_IRWXU)?;
28 
29         Ok(TempFifo { path, _dir: dir })
30     }
31 }
32 
33 impl AsRef<Path> for TempFifo {
as_ref(&self) -> &Path34     fn as_ref(&self) -> &Path {
35         self.path.as_ref()
36     }
37 }
38 
39 #[tokio::test]
fifo_simple_send() -> io::Result<()>40 async fn fifo_simple_send() -> io::Result<()> {
41     const DATA: &[u8] = b"this is some data to write to the fifo";
42 
43     let fifo = TempFifo::new("simple_send")?;
44 
45     // Create a reading task which should wait for data from the pipe.
46     let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
47     let mut read_fut = task::spawn(async move {
48         let mut buf = vec![0; DATA.len()];
49         reader.read_exact(&mut buf).await?;
50         Ok::<_, io::Error>(buf)
51     });
52     assert_pending!(read_fut.poll());
53 
54     let mut writer = pipe::OpenOptions::new().open_sender(&fifo)?;
55     writer.write_all(DATA).await?;
56 
57     // Let the IO driver poll events for the reader.
58     while !read_fut.is_woken() {
59         tokio::task::yield_now().await;
60     }
61 
62     // Reading task should be ready now.
63     let read_data = assert_ready_ok!(read_fut.poll());
64     assert_eq!(&read_data, DATA);
65 
66     Ok(())
67 }
68 
69 #[tokio::test]
70 #[cfg(target_os = "linux")]
fifo_simple_send_sender_first() -> io::Result<()>71 async fn fifo_simple_send_sender_first() -> io::Result<()> {
72     const DATA: &[u8] = b"this is some data to write to the fifo";
73 
74     // Create a new fifo file with *no reading ends open*.
75     let fifo = TempFifo::new("simple_send_sender_first")?;
76 
77     // Simple `open_sender` should fail with ENXIO (no such device or address).
78     let err = assert_err!(pipe::OpenOptions::new().open_sender(&fifo));
79     assert_eq!(err.raw_os_error(), Some(libc::ENXIO));
80 
81     // `open_sender` in read-write mode should succeed and the pipe should be ready to write.
82     let mut writer = pipe::OpenOptions::new()
83         .read_write(true)
84         .open_sender(&fifo)?;
85     writer.write_all(DATA).await?;
86 
87     // Read the written data and validate.
88     let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
89     let mut read_data = vec![0; DATA.len()];
90     reader.read_exact(&mut read_data).await?;
91     assert_eq!(&read_data, DATA);
92 
93     Ok(())
94 }
95 
96 // Opens a FIFO file, write and *close the writer*.
write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()>97 async fn write_and_close(path: impl AsRef<Path>, msg: &[u8]) -> io::Result<()> {
98     let mut writer = pipe::OpenOptions::new().open_sender(path)?;
99     writer.write_all(msg).await?;
100     drop(writer); // Explicit drop.
101     Ok(())
102 }
103 
104 /// Checks EOF behavior with single reader and writers sequentially opening
105 /// and closing a FIFO.
106 #[tokio::test]
fifo_multiple_writes() -> io::Result<()>107 async fn fifo_multiple_writes() -> io::Result<()> {
108     const DATA: &[u8] = b"this is some data to write to the fifo";
109 
110     let fifo = TempFifo::new("fifo_multiple_writes")?;
111 
112     let mut reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
113 
114     write_and_close(&fifo, DATA).await?;
115     let ev = reader.ready(Interest::READABLE).await?;
116     assert!(ev.is_readable());
117     let mut read_data = vec![0; DATA.len()];
118     assert_ok!(reader.read_exact(&mut read_data).await);
119 
120     // Check that reader hits EOF.
121     let err = assert_err!(reader.read_exact(&mut read_data).await);
122     assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
123 
124     // Write more data and read again.
125     write_and_close(&fifo, DATA).await?;
126     assert_ok!(reader.read_exact(&mut read_data).await);
127 
128     Ok(())
129 }
130 
131 /// Checks behavior of a resilient reader (Receiver in O_RDWR access mode)
132 /// with writers sequentially opening and closing a FIFO.
133 #[tokio::test]
134 #[cfg(target_os = "linux")]
fifo_resilient_reader() -> io::Result<()>135 async fn fifo_resilient_reader() -> io::Result<()> {
136     const DATA: &[u8] = b"this is some data to write to the fifo";
137 
138     let fifo = TempFifo::new("fifo_resilient_reader")?;
139 
140     // Open reader in read-write access mode.
141     let mut reader = pipe::OpenOptions::new()
142         .read_write(true)
143         .open_receiver(&fifo)?;
144 
145     write_and_close(&fifo, DATA).await?;
146     let ev = reader.ready(Interest::READABLE).await?;
147     let mut read_data = vec![0; DATA.len()];
148     reader.read_exact(&mut read_data).await?;
149 
150     // Check that reader didn't hit EOF.
151     assert!(!ev.is_read_closed());
152 
153     // Resilient reader can asynchronously wait for the next writer.
154     let mut second_read_fut = task::spawn(reader.read_exact(&mut read_data));
155     assert_pending!(second_read_fut.poll());
156 
157     // Write more data and read again.
158     write_and_close(&fifo, DATA).await?;
159     assert_ok!(second_read_fut.await);
160 
161     Ok(())
162 }
163 
164 #[tokio::test]
open_detects_not_a_fifo() -> io::Result<()>165 async fn open_detects_not_a_fifo() -> io::Result<()> {
166     let dir = tempfile::Builder::new()
167         .prefix("tokio-fifo-tests")
168         .tempdir()
169         .unwrap();
170     let path = dir.path().join("not_a_fifo");
171 
172     // Create an ordinary file.
173     File::create(&path)?;
174 
175     // Check if Sender detects invalid file type.
176     let err = assert_err!(pipe::OpenOptions::new().open_sender(&path));
177     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
178 
179     // Check if Receiver detects invalid file type.
180     let err = assert_err!(pipe::OpenOptions::new().open_sender(&path));
181     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
182 
183     Ok(())
184 }
185 
186 #[tokio::test]
from_file() -> io::Result<()>187 async fn from_file() -> io::Result<()> {
188     const DATA: &[u8] = b"this is some data to write to the fifo";
189 
190     let fifo = TempFifo::new("from_file")?;
191 
192     // Construct a Receiver from a File.
193     let file = std::fs::OpenOptions::new()
194         .read(true)
195         .custom_flags(libc::O_NONBLOCK)
196         .open(&fifo)?;
197     let mut reader = pipe::Receiver::from_file(file)?;
198 
199     // Construct a Sender from a File.
200     let file = std::fs::OpenOptions::new()
201         .write(true)
202         .custom_flags(libc::O_NONBLOCK)
203         .open(&fifo)?;
204     let mut writer = pipe::Sender::from_file(file)?;
205 
206     // Write and read some data to test async.
207     let mut read_fut = task::spawn(async move {
208         let mut buf = vec![0; DATA.len()];
209         reader.read_exact(&mut buf).await?;
210         Ok::<_, io::Error>(buf)
211     });
212     assert_pending!(read_fut.poll());
213 
214     writer.write_all(DATA).await?;
215 
216     let read_data = assert_ok!(read_fut.await);
217     assert_eq!(&read_data, DATA);
218 
219     Ok(())
220 }
221 
222 #[tokio::test]
from_file_detects_not_a_fifo() -> io::Result<()>223 async fn from_file_detects_not_a_fifo() -> io::Result<()> {
224     let dir = tempfile::Builder::new()
225         .prefix("tokio-fifo-tests")
226         .tempdir()
227         .unwrap();
228     let path = dir.path().join("not_a_fifo");
229 
230     // Create an ordinary file.
231     File::create(&path)?;
232 
233     // Check if Sender detects invalid file type.
234     let file = std::fs::OpenOptions::new().write(true).open(&path)?;
235     let err = assert_err!(pipe::Sender::from_file(file));
236     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
237 
238     // Check if Receiver detects invalid file type.
239     let file = std::fs::OpenOptions::new().read(true).open(&path)?;
240     let err = assert_err!(pipe::Receiver::from_file(file));
241     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
242 
243     Ok(())
244 }
245 
246 #[tokio::test]
from_file_detects_wrong_access_mode() -> io::Result<()>247 async fn from_file_detects_wrong_access_mode() -> io::Result<()> {
248     let fifo = TempFifo::new("wrong_access_mode")?;
249 
250     // Open a read end to open the fifo for writing.
251     let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
252 
253     // Check if Receiver detects write-only access mode.
254     let wronly = std::fs::OpenOptions::new()
255         .write(true)
256         .custom_flags(libc::O_NONBLOCK)
257         .open(&fifo)?;
258     let err = assert_err!(pipe::Receiver::from_file(wronly));
259     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
260 
261     // Check if Sender detects read-only access mode.
262     let rdonly = std::fs::OpenOptions::new()
263         .read(true)
264         .custom_flags(libc::O_NONBLOCK)
265         .open(&fifo)?;
266     let err = assert_err!(pipe::Sender::from_file(rdonly));
267     assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
268 
269     Ok(())
270 }
271 
is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool>272 fn is_nonblocking<T: AsRawFd>(fd: &T) -> io::Result<bool> {
273     let flags = nix::fcntl::fcntl(fd.as_raw_fd(), nix::fcntl::F_GETFL)?;
274     Ok((flags & libc::O_NONBLOCK) != 0)
275 }
276 
277 #[tokio::test]
from_file_sets_nonblock() -> io::Result<()>278 async fn from_file_sets_nonblock() -> io::Result<()> {
279     let fifo = TempFifo::new("sets_nonblock")?;
280 
281     // Open read and write ends to let blocking files open.
282     let _reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
283     let _writer = pipe::OpenOptions::new().open_sender(&fifo)?;
284 
285     // Check if Receiver sets the pipe in non-blocking mode.
286     let rdonly = std::fs::OpenOptions::new().read(true).open(&fifo)?;
287     assert!(!is_nonblocking(&rdonly)?);
288     let reader = pipe::Receiver::from_file(rdonly)?;
289     assert!(is_nonblocking(&reader)?);
290 
291     // Check if Sender sets the pipe in non-blocking mode.
292     let wronly = std::fs::OpenOptions::new().write(true).open(&fifo)?;
293     assert!(!is_nonblocking(&wronly)?);
294     let writer = pipe::Sender::from_file(wronly)?;
295     assert!(is_nonblocking(&writer)?);
296 
297     Ok(())
298 }
299 
writable_by_poll(writer: &pipe::Sender) -> bool300 fn writable_by_poll(writer: &pipe::Sender) -> bool {
301     task::spawn(writer.writable()).poll().is_ready()
302 }
303 
304 #[tokio::test]
try_read_write() -> io::Result<()>305 async fn try_read_write() -> io::Result<()> {
306     const DATA: &[u8] = b"this is some data to write to the fifo";
307 
308     // Create a pipe pair over a fifo file.
309     let fifo = TempFifo::new("try_read_write")?;
310     let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
311     let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
312 
313     // Fill the pipe buffer with `try_write`.
314     let mut write_data = Vec::new();
315     while writable_by_poll(&writer) {
316         match writer.try_write(DATA) {
317             Ok(n) => write_data.extend(&DATA[..n]),
318             Err(e) => {
319                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
320                 break;
321             }
322         }
323     }
324 
325     // Drain the pipe buffer with `try_read`.
326     let mut read_data = vec![0; write_data.len()];
327     let mut i = 0;
328     while i < write_data.len() {
329         reader.readable().await?;
330         match reader.try_read(&mut read_data[i..]) {
331             Ok(n) => i += n,
332             Err(e) => {
333                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
334                 continue;
335             }
336         }
337     }
338 
339     assert_eq!(read_data, write_data);
340 
341     Ok(())
342 }
343 
344 #[tokio::test]
try_read_write_vectored() -> io::Result<()>345 async fn try_read_write_vectored() -> io::Result<()> {
346     const DATA: &[u8] = b"this is some data to write to the fifo";
347 
348     // Create a pipe pair over a fifo file.
349     let fifo = TempFifo::new("try_read_write_vectored")?;
350     let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
351     let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
352 
353     let write_bufs: Vec<_> = DATA.chunks(3).map(io::IoSlice::new).collect();
354 
355     // Fill the pipe buffer with `try_write_vectored`.
356     let mut write_data = Vec::new();
357     while writable_by_poll(&writer) {
358         match writer.try_write_vectored(&write_bufs) {
359             Ok(n) => write_data.extend(&DATA[..n]),
360             Err(e) => {
361                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
362                 break;
363             }
364         }
365     }
366 
367     // Drain the pipe buffer with `try_read_vectored`.
368     let mut read_data = vec![0; write_data.len()];
369     let mut i = 0;
370     while i < write_data.len() {
371         reader.readable().await?;
372 
373         let mut read_bufs: Vec<_> = read_data[i..]
374             .chunks_mut(0x10000)
375             .map(io::IoSliceMut::new)
376             .collect();
377         match reader.try_read_vectored(&mut read_bufs) {
378             Ok(n) => i += n,
379             Err(e) => {
380                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
381                 continue;
382             }
383         }
384     }
385 
386     assert_eq!(read_data, write_data);
387 
388     Ok(())
389 }
390 
391 #[tokio::test]
try_read_buf() -> std::io::Result<()>392 async fn try_read_buf() -> std::io::Result<()> {
393     const DATA: &[u8] = b"this is some data to write to the fifo";
394 
395     // Create a pipe pair over a fifo file.
396     let fifo = TempFifo::new("try_read_write_vectored")?;
397     let reader = pipe::OpenOptions::new().open_receiver(&fifo)?;
398     let writer = pipe::OpenOptions::new().open_sender(&fifo)?;
399 
400     // Fill the pipe buffer with `try_write`.
401     let mut write_data = Vec::new();
402     while writable_by_poll(&writer) {
403         match writer.try_write(DATA) {
404             Ok(n) => write_data.extend(&DATA[..n]),
405             Err(e) => {
406                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
407                 break;
408             }
409         }
410     }
411 
412     // Drain the pipe buffer with `try_read_buf`.
413     let mut read_data = vec![0; write_data.len()];
414     let mut i = 0;
415     while i < write_data.len() {
416         reader.readable().await?;
417         match reader.try_read_buf(&mut read_data) {
418             Ok(n) => i += n,
419             Err(e) => {
420                 assert_eq!(e.kind(), io::ErrorKind::WouldBlock);
421                 continue;
422             }
423         }
424     }
425 
426     assert_eq!(read_data, write_data);
427 
428     Ok(())
429 }
430