• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::{
2     io::{Read, Seek, Write},
3     ops::Deref,
4     os::unix::io::AsRawFd,
5     pin::Pin,
6     sync::atomic::{AtomicBool, Ordering},
7     thread, time,
8 };
9 
10 use libc::c_int;
11 use nix::{
12     errno::*,
13     sys::{
14         aio::*,
15         signal::{
16             sigaction, SaFlags, SigAction, SigHandler, SigSet, SigevNotify,
17             Signal,
18         },
19         time::{TimeSpec, TimeValLike},
20     },
21 };
22 use tempfile::tempfile;
23 
24 lazy_static! {
25     pub static ref SIGNALED: AtomicBool = AtomicBool::new(false);
26 }
27 
sigfunc(_: c_int)28 extern "C" fn sigfunc(_: c_int) {
29     SIGNALED.store(true, Ordering::Relaxed);
30 }
31 
32 // Helper that polls an AioCb for completion or error
33 macro_rules! poll_aio {
34     ($aiocb: expr) => {
35         loop {
36             let err = $aiocb.as_mut().error();
37             if err != Err(Errno::EINPROGRESS) {
38                 break err;
39             };
40             thread::sleep(time::Duration::from_millis(10));
41         }
42     };
43 }
44 
45 mod aio_fsync {
46     use super::*;
47 
48     #[test]
test_accessors()49     fn test_accessors() {
50         let aiocb = AioFsync::new(
51             1001,
52             AioFsyncMode::O_SYNC,
53             42,
54             SigevNotify::SigevSignal {
55                 signal: Signal::SIGUSR2,
56                 si_value: 99,
57             },
58         );
59         assert_eq!(1001, aiocb.fd());
60         assert_eq!(AioFsyncMode::O_SYNC, aiocb.mode());
61         assert_eq!(42, aiocb.priority());
62         let sev = aiocb.sigevent().sigevent();
63         assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
64         assert_eq!(99, sev.sigev_value.sival_ptr as i64);
65     }
66 
67     /// `AioFsync::submit` should not modify the `AioCb` object if
68     /// `libc::aio_fsync` returns an error
69     // Skip on Linux, because Linux's AIO implementation can't detect errors
70     // synchronously
71     #[test]
72     #[cfg(any(target_os = "freebsd", target_os = "macos"))]
error()73     fn error() {
74         use std::mem;
75 
76         const INITIAL: &[u8] = b"abcdef123456";
77         // Create an invalid AioFsyncMode
78         let mode = unsafe { mem::transmute(666) };
79         let mut f = tempfile().unwrap();
80         f.write_all(INITIAL).unwrap();
81         let mut aiof = Box::pin(AioFsync::new(
82             f.as_raw_fd(),
83             mode,
84             0,
85             SigevNotify::SigevNone,
86         ));
87         let err = aiof.as_mut().submit();
88         err.expect_err("assertion failed");
89     }
90 
91     #[test]
92     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()93     fn ok() {
94         const INITIAL: &[u8] = b"abcdef123456";
95         let mut f = tempfile().unwrap();
96         f.write_all(INITIAL).unwrap();
97         let fd = f.as_raw_fd();
98         let mut aiof = Box::pin(AioFsync::new(
99             fd,
100             AioFsyncMode::O_SYNC,
101             0,
102             SigevNotify::SigevNone,
103         ));
104         aiof.as_mut().submit().unwrap();
105         poll_aio!(&mut aiof).unwrap();
106         aiof.as_mut().aio_return().unwrap();
107     }
108 }
109 
110 mod aio_read {
111     use super::*;
112 
113     #[test]
test_accessors()114     fn test_accessors() {
115         let mut rbuf = vec![0; 4];
116         let aiocb = AioRead::new(
117             1001,
118             2, //offset
119             &mut rbuf,
120             42, //priority
121             SigevNotify::SigevSignal {
122                 signal: Signal::SIGUSR2,
123                 si_value: 99,
124             },
125         );
126         assert_eq!(1001, aiocb.fd());
127         assert_eq!(4, aiocb.nbytes());
128         assert_eq!(2, aiocb.offset());
129         assert_eq!(42, aiocb.priority());
130         let sev = aiocb.sigevent().sigevent();
131         assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
132         assert_eq!(99, sev.sigev_value.sival_ptr as i64);
133     }
134 
135     // Tests AioWrite.cancel.  We aren't trying to test the OS's implementation,
136     // only our bindings.  So it's sufficient to check that cancel
137     // returned any AioCancelStat value.
138     #[test]
139     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
cancel()140     fn cancel() {
141         const INITIAL: &[u8] = b"abcdef123456";
142         let mut rbuf = vec![0; 4];
143         let mut f = tempfile().unwrap();
144         f.write_all(INITIAL).unwrap();
145         let fd = f.as_raw_fd();
146         let mut aior =
147             Box::pin(AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone));
148         aior.as_mut().submit().unwrap();
149 
150         aior.as_mut().cancel().unwrap();
151 
152         // Wait for aiow to complete, but don't care whether it succeeded
153         let _ = poll_aio!(&mut aior);
154         let _ = aior.as_mut().aio_return();
155     }
156 
157     /// `AioRead::submit` should not modify the `AioCb` object if
158     /// `libc::aio_read` returns an error
159     // Skip on Linux, because Linux's AIO implementation can't detect errors
160     // synchronously
161     #[test]
162     #[cfg(any(target_os = "freebsd", target_os = "macos"))]
error()163     fn error() {
164         const INITIAL: &[u8] = b"abcdef123456";
165         let mut rbuf = vec![0; 4];
166         let mut f = tempfile().unwrap();
167         f.write_all(INITIAL).unwrap();
168         let mut aior = Box::pin(AioRead::new(
169             f.as_raw_fd(),
170             -1, //an invalid offset
171             &mut rbuf,
172             0, //priority
173             SigevNotify::SigevNone,
174         ));
175         aior.as_mut().submit().expect_err("assertion failed");
176     }
177 
178     // Test a simple aio operation with no completion notification.  We must
179     // poll for completion
180     #[test]
181     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()182     fn ok() {
183         const INITIAL: &[u8] = b"abcdef123456";
184         let mut rbuf = vec![0; 4];
185         const EXPECT: &[u8] = b"cdef";
186         let mut f = tempfile().unwrap();
187         f.write_all(INITIAL).unwrap();
188         {
189             let fd = f.as_raw_fd();
190             let mut aior = Box::pin(AioRead::new(
191                 fd,
192                 2,
193                 &mut rbuf,
194                 0,
195                 SigevNotify::SigevNone,
196             ));
197             aior.as_mut().submit().unwrap();
198 
199             let err = poll_aio!(&mut aior);
200             assert_eq!(err, Ok(()));
201             assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len());
202         }
203         assert_eq!(EXPECT, rbuf.deref().deref());
204     }
205 
206     // Like ok, but allocates the structure on the stack.
207     #[test]
208     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
on_stack()209     fn on_stack() {
210         const INITIAL: &[u8] = b"abcdef123456";
211         let mut rbuf = vec![0; 4];
212         const EXPECT: &[u8] = b"cdef";
213         let mut f = tempfile().unwrap();
214         f.write_all(INITIAL).unwrap();
215         {
216             let fd = f.as_raw_fd();
217             let mut aior =
218                 AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone);
219             let mut aior = unsafe { Pin::new_unchecked(&mut aior) };
220             aior.as_mut().submit().unwrap();
221 
222             let err = poll_aio!(&mut aior);
223             assert_eq!(err, Ok(()));
224             assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len());
225         }
226         assert_eq!(EXPECT, rbuf.deref().deref());
227     }
228 }
229 
230 #[cfg(target_os = "freebsd")]
231 #[cfg(fbsd14)]
232 mod aio_readv {
233     use std::io::IoSliceMut;
234 
235     use super::*;
236 
237     #[test]
test_accessors()238     fn test_accessors() {
239         let mut rbuf0 = vec![0; 4];
240         let mut rbuf1 = vec![0; 8];
241         let mut rbufs =
242             [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
243         let aiocb = AioReadv::new(
244             1001,
245             2, //offset
246             &mut rbufs,
247             42, //priority
248             SigevNotify::SigevSignal {
249                 signal: Signal::SIGUSR2,
250                 si_value: 99,
251             },
252         );
253         assert_eq!(1001, aiocb.fd());
254         assert_eq!(2, aiocb.iovlen());
255         assert_eq!(2, aiocb.offset());
256         assert_eq!(42, aiocb.priority());
257         let sev = aiocb.sigevent().sigevent();
258         assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
259         assert_eq!(99, sev.sigev_value.sival_ptr as i64);
260     }
261 
262     #[test]
263     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()264     fn ok() {
265         const INITIAL: &[u8] = b"abcdef123456";
266         let mut rbuf0 = vec![0; 4];
267         let mut rbuf1 = vec![0; 2];
268         let mut rbufs =
269             [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
270         const EXPECT0: &[u8] = b"cdef";
271         const EXPECT1: &[u8] = b"12";
272         let mut f = tempfile().unwrap();
273         f.write_all(INITIAL).unwrap();
274         {
275             let fd = f.as_raw_fd();
276             let mut aior = Box::pin(AioReadv::new(
277                 fd,
278                 2,
279                 &mut rbufs,
280                 0,
281                 SigevNotify::SigevNone,
282             ));
283             aior.as_mut().submit().unwrap();
284 
285             let err = poll_aio!(&mut aior);
286             assert_eq!(err, Ok(()));
287             assert_eq!(
288                 aior.as_mut().aio_return().unwrap(),
289                 EXPECT0.len() + EXPECT1.len()
290             );
291         }
292         assert_eq!(&EXPECT0, &rbuf0);
293         assert_eq!(&EXPECT1, &rbuf1);
294     }
295 }
296 
297 mod aio_write {
298     use super::*;
299 
300     #[test]
test_accessors()301     fn test_accessors() {
302         let wbuf = vec![0; 4];
303         let aiocb = AioWrite::new(
304             1001,
305             2, //offset
306             &wbuf,
307             42, //priority
308             SigevNotify::SigevSignal {
309                 signal: Signal::SIGUSR2,
310                 si_value: 99,
311             },
312         );
313         assert_eq!(1001, aiocb.fd());
314         assert_eq!(4, aiocb.nbytes());
315         assert_eq!(2, aiocb.offset());
316         assert_eq!(42, aiocb.priority());
317         let sev = aiocb.sigevent().sigevent();
318         assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
319         assert_eq!(99, sev.sigev_value.sival_ptr as i64);
320     }
321 
322     // Tests AioWrite.cancel.  We aren't trying to test the OS's implementation,
323     // only our bindings.  So it's sufficient to check that cancel
324     // returned any AioCancelStat value.
325     #[test]
326     #[cfg_attr(target_env = "musl", ignore)]
cancel()327     fn cancel() {
328         let wbuf: &[u8] = b"CDEF";
329 
330         let f = tempfile().unwrap();
331         let mut aiow = Box::pin(AioWrite::new(
332             f.as_raw_fd(),
333             0,
334             wbuf,
335             0,
336             SigevNotify::SigevNone,
337         ));
338         aiow.as_mut().submit().unwrap();
339         let err = aiow.as_mut().error();
340         assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
341 
342         aiow.as_mut().cancel().unwrap();
343 
344         // Wait for aiow to complete, but don't care whether it succeeded
345         let _ = poll_aio!(&mut aiow);
346         let _ = aiow.as_mut().aio_return();
347     }
348 
349     // Test a simple aio operation with no completion notification.  We must
350     // poll for completion.
351     #[test]
352     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()353     fn ok() {
354         const INITIAL: &[u8] = b"abcdef123456";
355         let wbuf = "CDEF".to_string().into_bytes();
356         let mut rbuf = Vec::new();
357         const EXPECT: &[u8] = b"abCDEF123456";
358 
359         let mut f = tempfile().unwrap();
360         f.write_all(INITIAL).unwrap();
361         let mut aiow = Box::pin(AioWrite::new(
362             f.as_raw_fd(),
363             2,
364             &wbuf,
365             0,
366             SigevNotify::SigevNone,
367         ));
368         aiow.as_mut().submit().unwrap();
369 
370         let err = poll_aio!(&mut aiow);
371         assert_eq!(err, Ok(()));
372         assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len());
373 
374         f.rewind().unwrap();
375         let len = f.read_to_end(&mut rbuf).unwrap();
376         assert_eq!(len, EXPECT.len());
377         assert_eq!(rbuf, EXPECT);
378     }
379 
380     // Like ok, but allocates the structure on the stack.
381     #[test]
382     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
on_stack()383     fn on_stack() {
384         const INITIAL: &[u8] = b"abcdef123456";
385         let wbuf = "CDEF".to_string().into_bytes();
386         let mut rbuf = Vec::new();
387         const EXPECT: &[u8] = b"abCDEF123456";
388 
389         let mut f = tempfile().unwrap();
390         f.write_all(INITIAL).unwrap();
391         let mut aiow = AioWrite::new(
392             f.as_raw_fd(),
393             2, //offset
394             &wbuf,
395             0, //priority
396             SigevNotify::SigevNone,
397         );
398         let mut aiow = unsafe { Pin::new_unchecked(&mut aiow) };
399         aiow.as_mut().submit().unwrap();
400 
401         let err = poll_aio!(&mut aiow);
402         assert_eq!(err, Ok(()));
403         assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len());
404 
405         f.rewind().unwrap();
406         let len = f.read_to_end(&mut rbuf).unwrap();
407         assert_eq!(len, EXPECT.len());
408         assert_eq!(rbuf, EXPECT);
409     }
410 
411     /// `AioWrite::write` should not modify the `AioCb` object if
412     /// `libc::aio_write` returns an error.
413     // Skip on Linux, because Linux's AIO implementation can't detect errors
414     // synchronously
415     #[test]
416     #[cfg(any(target_os = "freebsd", target_os = "macos"))]
error()417     fn error() {
418         let wbuf = "CDEF".to_string().into_bytes();
419         let mut aiow = Box::pin(AioWrite::new(
420             666, // An invalid file descriptor
421             0,   //offset
422             &wbuf,
423             0, //priority
424             SigevNotify::SigevNone,
425         ));
426         aiow.as_mut().submit().expect_err("assertion failed");
427         // Dropping the AioWrite at this point should not panic
428     }
429 }
430 
431 #[cfg(target_os = "freebsd")]
432 #[cfg(fbsd14)]
433 mod aio_writev {
434     use std::io::IoSlice;
435 
436     use super::*;
437 
438     #[test]
test_accessors()439     fn test_accessors() {
440         let wbuf0 = vec![0; 4];
441         let wbuf1 = vec![0; 8];
442         let wbufs = [IoSlice::new(&wbuf0), IoSlice::new(&wbuf1)];
443         let aiocb = AioWritev::new(
444             1001,
445             2, //offset
446             &wbufs,
447             42, //priority
448             SigevNotify::SigevSignal {
449                 signal: Signal::SIGUSR2,
450                 si_value: 99,
451             },
452         );
453         assert_eq!(1001, aiocb.fd());
454         assert_eq!(2, aiocb.iovlen());
455         assert_eq!(2, aiocb.offset());
456         assert_eq!(42, aiocb.priority());
457         let sev = aiocb.sigevent().sigevent();
458         assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
459         assert_eq!(99, sev.sigev_value.sival_ptr as i64);
460     }
461 
462     // Test a simple aio operation with no completion notification.  We must
463     // poll for completion.
464     #[test]
465     #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()466     fn ok() {
467         const INITIAL: &[u8] = b"abcdef123456";
468         let wbuf0 = b"BC";
469         let wbuf1 = b"DEF";
470         let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)];
471         let wlen = wbuf0.len() + wbuf1.len();
472         let mut rbuf = Vec::new();
473         const EXPECT: &[u8] = b"aBCDEF123456";
474 
475         let mut f = tempfile().unwrap();
476         f.write_all(INITIAL).unwrap();
477         let mut aiow = Box::pin(AioWritev::new(
478             f.as_raw_fd(),
479             1,
480             &wbufs,
481             0,
482             SigevNotify::SigevNone,
483         ));
484         aiow.as_mut().submit().unwrap();
485 
486         let err = poll_aio!(&mut aiow);
487         assert_eq!(err, Ok(()));
488         assert_eq!(aiow.as_mut().aio_return().unwrap(), wlen);
489 
490         f.rewind().unwrap();
491         let len = f.read_to_end(&mut rbuf).unwrap();
492         assert_eq!(len, EXPECT.len());
493         assert_eq!(rbuf, EXPECT);
494     }
495 }
496 
497 // Test an aio operation with completion delivered by a signal
498 #[test]
499 #[cfg_attr(
500     any(
501         all(target_env = "musl", target_arch = "x86_64"),
502         target_arch = "mips",
503         target_arch = "mips64"
504     ),
505     ignore
506 )]
sigev_signal()507 fn sigev_signal() {
508     let _m = crate::SIGNAL_MTX.lock();
509     let sa = SigAction::new(
510         SigHandler::Handler(sigfunc),
511         SaFlags::SA_RESETHAND,
512         SigSet::empty(),
513     );
514     SIGNALED.store(false, Ordering::Relaxed);
515     unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
516 
517     const INITIAL: &[u8] = b"abcdef123456";
518     const WBUF: &[u8] = b"CDEF";
519     let mut rbuf = Vec::new();
520     const EXPECT: &[u8] = b"abCDEF123456";
521 
522     let mut f = tempfile().unwrap();
523     f.write_all(INITIAL).unwrap();
524     let mut aiow = Box::pin(AioWrite::new(
525         f.as_raw_fd(),
526         2, //offset
527         WBUF,
528         0, //priority
529         SigevNotify::SigevSignal {
530             signal: Signal::SIGUSR2,
531             si_value: 0, //TODO: validate in sigfunc
532         },
533     ));
534     aiow.as_mut().submit().unwrap();
535     while !SIGNALED.load(Ordering::Relaxed) {
536         thread::sleep(time::Duration::from_millis(10));
537     }
538 
539     assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
540     f.rewind().unwrap();
541     let len = f.read_to_end(&mut rbuf).unwrap();
542     assert_eq!(len, EXPECT.len());
543     assert_eq!(rbuf, EXPECT);
544 }
545 
546 // Tests using aio_cancel_all for all outstanding IOs.
547 #[test]
548 #[cfg_attr(target_env = "musl", ignore)]
test_aio_cancel_all()549 fn test_aio_cancel_all() {
550     let wbuf: &[u8] = b"CDEF";
551 
552     let f = tempfile().unwrap();
553     let mut aiocb = Box::pin(AioWrite::new(
554         f.as_raw_fd(),
555         0, //offset
556         wbuf,
557         0, //priority
558         SigevNotify::SigevNone,
559     ));
560     aiocb.as_mut().submit().unwrap();
561     let err = aiocb.as_mut().error();
562     assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
563 
564     aio_cancel_all(f.as_raw_fd()).unwrap();
565 
566     // Wait for aiocb to complete, but don't care whether it succeeded
567     let _ = poll_aio!(&mut aiocb);
568     let _ = aiocb.as_mut().aio_return();
569 }
570 
571 #[test]
572 // On Cirrus on Linux, this test fails due to a glibc bug.
573 // https://github.com/nix-rust/nix/issues/1099
574 #[cfg_attr(target_os = "linux", ignore)]
575 // On Cirrus, aio_suspend is failing with EINVAL
576 // https://github.com/nix-rust/nix/issues/1361
577 #[cfg_attr(target_os = "macos", ignore)]
test_aio_suspend()578 fn test_aio_suspend() {
579     const INITIAL: &[u8] = b"abcdef123456";
580     const WBUF: &[u8] = b"CDEFG";
581     let timeout = TimeSpec::seconds(10);
582     let mut rbuf = vec![0; 4];
583     let rlen = rbuf.len();
584     let mut f = tempfile().unwrap();
585     f.write_all(INITIAL).unwrap();
586 
587     let mut wcb = Box::pin(AioWrite::new(
588         f.as_raw_fd(),
589         2, //offset
590         WBUF,
591         0, //priority
592         SigevNotify::SigevNone,
593     ));
594 
595     let mut rcb = Box::pin(AioRead::new(
596         f.as_raw_fd(),
597         8, //offset
598         &mut rbuf,
599         0, //priority
600         SigevNotify::SigevNone,
601     ));
602     wcb.as_mut().submit().unwrap();
603     rcb.as_mut().submit().unwrap();
604     loop {
605         {
606             let cbbuf = [
607                 &*wcb as &dyn AsRef<libc::aiocb>,
608                 &*rcb as &dyn AsRef<libc::aiocb>,
609             ];
610             let r = aio_suspend(&cbbuf[..], Some(timeout));
611             match r {
612                 Err(Errno::EINTR) => continue,
613                 Err(e) => panic!("aio_suspend returned {:?}", e),
614                 Ok(_) => (),
615             };
616         }
617         if rcb.as_mut().error() != Err(Errno::EINPROGRESS)
618             && wcb.as_mut().error() != Err(Errno::EINPROGRESS)
619         {
620             break;
621         }
622     }
623 
624     assert_eq!(wcb.as_mut().aio_return().unwrap(), WBUF.len());
625     assert_eq!(rcb.as_mut().aio_return().unwrap(), rlen);
626 }
627