1 use std::{
2 io::{Read, Seek, Write},
3 ops::Deref,
4 os::unix::io::AsRawFd,
5 pin::Pin,
6 sync::atomic::{AtomicBool, Ordering},
7 thread, time,
8 };
9
10 use libc::c_int;
11 use nix::{
12 errno::*,
13 sys::{
14 aio::*,
15 signal::{
16 sigaction, SaFlags, SigAction, SigHandler, SigSet, SigevNotify,
17 Signal,
18 },
19 time::{TimeSpec, TimeValLike},
20 },
21 };
22 use tempfile::tempfile;
23
24 lazy_static! {
25 pub static ref SIGNALED: AtomicBool = AtomicBool::new(false);
26 }
27
sigfunc(_: c_int)28 extern "C" fn sigfunc(_: c_int) {
29 SIGNALED.store(true, Ordering::Relaxed);
30 }
31
32 // Helper that polls an AioCb for completion or error
33 macro_rules! poll_aio {
34 ($aiocb: expr) => {
35 loop {
36 let err = $aiocb.as_mut().error();
37 if err != Err(Errno::EINPROGRESS) {
38 break err;
39 };
40 thread::sleep(time::Duration::from_millis(10));
41 }
42 };
43 }
44
45 mod aio_fsync {
46 use super::*;
47
48 #[test]
test_accessors()49 fn test_accessors() {
50 let aiocb = AioFsync::new(
51 1001,
52 AioFsyncMode::O_SYNC,
53 42,
54 SigevNotify::SigevSignal {
55 signal: Signal::SIGUSR2,
56 si_value: 99,
57 },
58 );
59 assert_eq!(1001, aiocb.fd());
60 assert_eq!(AioFsyncMode::O_SYNC, aiocb.mode());
61 assert_eq!(42, aiocb.priority());
62 let sev = aiocb.sigevent().sigevent();
63 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
64 assert_eq!(99, sev.sigev_value.sival_ptr as i64);
65 }
66
67 /// `AioFsync::submit` should not modify the `AioCb` object if
68 /// `libc::aio_fsync` returns an error
69 // Skip on Linux, because Linux's AIO implementation can't detect errors
70 // synchronously
71 #[test]
72 #[cfg(any(target_os = "freebsd", target_os = "macos"))]
error()73 fn error() {
74 use std::mem;
75
76 const INITIAL: &[u8] = b"abcdef123456";
77 // Create an invalid AioFsyncMode
78 let mode = unsafe { mem::transmute(666) };
79 let mut f = tempfile().unwrap();
80 f.write_all(INITIAL).unwrap();
81 let mut aiof = Box::pin(AioFsync::new(
82 f.as_raw_fd(),
83 mode,
84 0,
85 SigevNotify::SigevNone,
86 ));
87 let err = aiof.as_mut().submit();
88 err.expect_err("assertion failed");
89 }
90
91 #[test]
92 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()93 fn ok() {
94 const INITIAL: &[u8] = b"abcdef123456";
95 let mut f = tempfile().unwrap();
96 f.write_all(INITIAL).unwrap();
97 let fd = f.as_raw_fd();
98 let mut aiof = Box::pin(AioFsync::new(
99 fd,
100 AioFsyncMode::O_SYNC,
101 0,
102 SigevNotify::SigevNone,
103 ));
104 aiof.as_mut().submit().unwrap();
105 poll_aio!(&mut aiof).unwrap();
106 aiof.as_mut().aio_return().unwrap();
107 }
108 }
109
110 mod aio_read {
111 use super::*;
112
113 #[test]
test_accessors()114 fn test_accessors() {
115 let mut rbuf = vec![0; 4];
116 let aiocb = AioRead::new(
117 1001,
118 2, //offset
119 &mut rbuf,
120 42, //priority
121 SigevNotify::SigevSignal {
122 signal: Signal::SIGUSR2,
123 si_value: 99,
124 },
125 );
126 assert_eq!(1001, aiocb.fd());
127 assert_eq!(4, aiocb.nbytes());
128 assert_eq!(2, aiocb.offset());
129 assert_eq!(42, aiocb.priority());
130 let sev = aiocb.sigevent().sigevent();
131 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
132 assert_eq!(99, sev.sigev_value.sival_ptr as i64);
133 }
134
135 // Tests AioWrite.cancel. We aren't trying to test the OS's implementation,
136 // only our bindings. So it's sufficient to check that cancel
137 // returned any AioCancelStat value.
138 #[test]
139 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
cancel()140 fn cancel() {
141 const INITIAL: &[u8] = b"abcdef123456";
142 let mut rbuf = vec![0; 4];
143 let mut f = tempfile().unwrap();
144 f.write_all(INITIAL).unwrap();
145 let fd = f.as_raw_fd();
146 let mut aior =
147 Box::pin(AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone));
148 aior.as_mut().submit().unwrap();
149
150 aior.as_mut().cancel().unwrap();
151
152 // Wait for aiow to complete, but don't care whether it succeeded
153 let _ = poll_aio!(&mut aior);
154 let _ = aior.as_mut().aio_return();
155 }
156
157 /// `AioRead::submit` should not modify the `AioCb` object if
158 /// `libc::aio_read` returns an error
159 // Skip on Linux, because Linux's AIO implementation can't detect errors
160 // synchronously
161 #[test]
162 #[cfg(any(target_os = "freebsd", target_os = "macos"))]
error()163 fn error() {
164 const INITIAL: &[u8] = b"abcdef123456";
165 let mut rbuf = vec![0; 4];
166 let mut f = tempfile().unwrap();
167 f.write_all(INITIAL).unwrap();
168 let mut aior = Box::pin(AioRead::new(
169 f.as_raw_fd(),
170 -1, //an invalid offset
171 &mut rbuf,
172 0, //priority
173 SigevNotify::SigevNone,
174 ));
175 aior.as_mut().submit().expect_err("assertion failed");
176 }
177
178 // Test a simple aio operation with no completion notification. We must
179 // poll for completion
180 #[test]
181 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()182 fn ok() {
183 const INITIAL: &[u8] = b"abcdef123456";
184 let mut rbuf = vec![0; 4];
185 const EXPECT: &[u8] = b"cdef";
186 let mut f = tempfile().unwrap();
187 f.write_all(INITIAL).unwrap();
188 {
189 let fd = f.as_raw_fd();
190 let mut aior = Box::pin(AioRead::new(
191 fd,
192 2,
193 &mut rbuf,
194 0,
195 SigevNotify::SigevNone,
196 ));
197 aior.as_mut().submit().unwrap();
198
199 let err = poll_aio!(&mut aior);
200 assert_eq!(err, Ok(()));
201 assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len());
202 }
203 assert_eq!(EXPECT, rbuf.deref().deref());
204 }
205
206 // Like ok, but allocates the structure on the stack.
207 #[test]
208 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
on_stack()209 fn on_stack() {
210 const INITIAL: &[u8] = b"abcdef123456";
211 let mut rbuf = vec![0; 4];
212 const EXPECT: &[u8] = b"cdef";
213 let mut f = tempfile().unwrap();
214 f.write_all(INITIAL).unwrap();
215 {
216 let fd = f.as_raw_fd();
217 let mut aior =
218 AioRead::new(fd, 2, &mut rbuf, 0, SigevNotify::SigevNone);
219 let mut aior = unsafe { Pin::new_unchecked(&mut aior) };
220 aior.as_mut().submit().unwrap();
221
222 let err = poll_aio!(&mut aior);
223 assert_eq!(err, Ok(()));
224 assert_eq!(aior.as_mut().aio_return().unwrap(), EXPECT.len());
225 }
226 assert_eq!(EXPECT, rbuf.deref().deref());
227 }
228 }
229
230 #[cfg(target_os = "freebsd")]
231 #[cfg(fbsd14)]
232 mod aio_readv {
233 use std::io::IoSliceMut;
234
235 use super::*;
236
237 #[test]
test_accessors()238 fn test_accessors() {
239 let mut rbuf0 = vec![0; 4];
240 let mut rbuf1 = vec![0; 8];
241 let mut rbufs =
242 [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
243 let aiocb = AioReadv::new(
244 1001,
245 2, //offset
246 &mut rbufs,
247 42, //priority
248 SigevNotify::SigevSignal {
249 signal: Signal::SIGUSR2,
250 si_value: 99,
251 },
252 );
253 assert_eq!(1001, aiocb.fd());
254 assert_eq!(2, aiocb.iovlen());
255 assert_eq!(2, aiocb.offset());
256 assert_eq!(42, aiocb.priority());
257 let sev = aiocb.sigevent().sigevent();
258 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
259 assert_eq!(99, sev.sigev_value.sival_ptr as i64);
260 }
261
262 #[test]
263 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()264 fn ok() {
265 const INITIAL: &[u8] = b"abcdef123456";
266 let mut rbuf0 = vec![0; 4];
267 let mut rbuf1 = vec![0; 2];
268 let mut rbufs =
269 [IoSliceMut::new(&mut rbuf0), IoSliceMut::new(&mut rbuf1)];
270 const EXPECT0: &[u8] = b"cdef";
271 const EXPECT1: &[u8] = b"12";
272 let mut f = tempfile().unwrap();
273 f.write_all(INITIAL).unwrap();
274 {
275 let fd = f.as_raw_fd();
276 let mut aior = Box::pin(AioReadv::new(
277 fd,
278 2,
279 &mut rbufs,
280 0,
281 SigevNotify::SigevNone,
282 ));
283 aior.as_mut().submit().unwrap();
284
285 let err = poll_aio!(&mut aior);
286 assert_eq!(err, Ok(()));
287 assert_eq!(
288 aior.as_mut().aio_return().unwrap(),
289 EXPECT0.len() + EXPECT1.len()
290 );
291 }
292 assert_eq!(&EXPECT0, &rbuf0);
293 assert_eq!(&EXPECT1, &rbuf1);
294 }
295 }
296
297 mod aio_write {
298 use super::*;
299
300 #[test]
test_accessors()301 fn test_accessors() {
302 let wbuf = vec![0; 4];
303 let aiocb = AioWrite::new(
304 1001,
305 2, //offset
306 &wbuf,
307 42, //priority
308 SigevNotify::SigevSignal {
309 signal: Signal::SIGUSR2,
310 si_value: 99,
311 },
312 );
313 assert_eq!(1001, aiocb.fd());
314 assert_eq!(4, aiocb.nbytes());
315 assert_eq!(2, aiocb.offset());
316 assert_eq!(42, aiocb.priority());
317 let sev = aiocb.sigevent().sigevent();
318 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
319 assert_eq!(99, sev.sigev_value.sival_ptr as i64);
320 }
321
322 // Tests AioWrite.cancel. We aren't trying to test the OS's implementation,
323 // only our bindings. So it's sufficient to check that cancel
324 // returned any AioCancelStat value.
325 #[test]
326 #[cfg_attr(target_env = "musl", ignore)]
cancel()327 fn cancel() {
328 let wbuf: &[u8] = b"CDEF";
329
330 let f = tempfile().unwrap();
331 let mut aiow = Box::pin(AioWrite::new(
332 f.as_raw_fd(),
333 0,
334 wbuf,
335 0,
336 SigevNotify::SigevNone,
337 ));
338 aiow.as_mut().submit().unwrap();
339 let err = aiow.as_mut().error();
340 assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
341
342 aiow.as_mut().cancel().unwrap();
343
344 // Wait for aiow to complete, but don't care whether it succeeded
345 let _ = poll_aio!(&mut aiow);
346 let _ = aiow.as_mut().aio_return();
347 }
348
349 // Test a simple aio operation with no completion notification. We must
350 // poll for completion.
351 #[test]
352 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()353 fn ok() {
354 const INITIAL: &[u8] = b"abcdef123456";
355 let wbuf = "CDEF".to_string().into_bytes();
356 let mut rbuf = Vec::new();
357 const EXPECT: &[u8] = b"abCDEF123456";
358
359 let mut f = tempfile().unwrap();
360 f.write_all(INITIAL).unwrap();
361 let mut aiow = Box::pin(AioWrite::new(
362 f.as_raw_fd(),
363 2,
364 &wbuf,
365 0,
366 SigevNotify::SigevNone,
367 ));
368 aiow.as_mut().submit().unwrap();
369
370 let err = poll_aio!(&mut aiow);
371 assert_eq!(err, Ok(()));
372 assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len());
373
374 f.rewind().unwrap();
375 let len = f.read_to_end(&mut rbuf).unwrap();
376 assert_eq!(len, EXPECT.len());
377 assert_eq!(rbuf, EXPECT);
378 }
379
380 // Like ok, but allocates the structure on the stack.
381 #[test]
382 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
on_stack()383 fn on_stack() {
384 const INITIAL: &[u8] = b"abcdef123456";
385 let wbuf = "CDEF".to_string().into_bytes();
386 let mut rbuf = Vec::new();
387 const EXPECT: &[u8] = b"abCDEF123456";
388
389 let mut f = tempfile().unwrap();
390 f.write_all(INITIAL).unwrap();
391 let mut aiow = AioWrite::new(
392 f.as_raw_fd(),
393 2, //offset
394 &wbuf,
395 0, //priority
396 SigevNotify::SigevNone,
397 );
398 let mut aiow = unsafe { Pin::new_unchecked(&mut aiow) };
399 aiow.as_mut().submit().unwrap();
400
401 let err = poll_aio!(&mut aiow);
402 assert_eq!(err, Ok(()));
403 assert_eq!(aiow.as_mut().aio_return().unwrap(), wbuf.len());
404
405 f.rewind().unwrap();
406 let len = f.read_to_end(&mut rbuf).unwrap();
407 assert_eq!(len, EXPECT.len());
408 assert_eq!(rbuf, EXPECT);
409 }
410
411 /// `AioWrite::write` should not modify the `AioCb` object if
412 /// `libc::aio_write` returns an error.
413 // Skip on Linux, because Linux's AIO implementation can't detect errors
414 // synchronously
415 #[test]
416 #[cfg(any(target_os = "freebsd", target_os = "macos"))]
error()417 fn error() {
418 let wbuf = "CDEF".to_string().into_bytes();
419 let mut aiow = Box::pin(AioWrite::new(
420 666, // An invalid file descriptor
421 0, //offset
422 &wbuf,
423 0, //priority
424 SigevNotify::SigevNone,
425 ));
426 aiow.as_mut().submit().expect_err("assertion failed");
427 // Dropping the AioWrite at this point should not panic
428 }
429 }
430
431 #[cfg(target_os = "freebsd")]
432 #[cfg(fbsd14)]
433 mod aio_writev {
434 use std::io::IoSlice;
435
436 use super::*;
437
438 #[test]
test_accessors()439 fn test_accessors() {
440 let wbuf0 = vec![0; 4];
441 let wbuf1 = vec![0; 8];
442 let wbufs = [IoSlice::new(&wbuf0), IoSlice::new(&wbuf1)];
443 let aiocb = AioWritev::new(
444 1001,
445 2, //offset
446 &wbufs,
447 42, //priority
448 SigevNotify::SigevSignal {
449 signal: Signal::SIGUSR2,
450 si_value: 99,
451 },
452 );
453 assert_eq!(1001, aiocb.fd());
454 assert_eq!(2, aiocb.iovlen());
455 assert_eq!(2, aiocb.offset());
456 assert_eq!(42, aiocb.priority());
457 let sev = aiocb.sigevent().sigevent();
458 assert_eq!(Signal::SIGUSR2 as i32, sev.sigev_signo);
459 assert_eq!(99, sev.sigev_value.sival_ptr as i64);
460 }
461
462 // Test a simple aio operation with no completion notification. We must
463 // poll for completion.
464 #[test]
465 #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)]
ok()466 fn ok() {
467 const INITIAL: &[u8] = b"abcdef123456";
468 let wbuf0 = b"BC";
469 let wbuf1 = b"DEF";
470 let wbufs = [IoSlice::new(wbuf0), IoSlice::new(wbuf1)];
471 let wlen = wbuf0.len() + wbuf1.len();
472 let mut rbuf = Vec::new();
473 const EXPECT: &[u8] = b"aBCDEF123456";
474
475 let mut f = tempfile().unwrap();
476 f.write_all(INITIAL).unwrap();
477 let mut aiow = Box::pin(AioWritev::new(
478 f.as_raw_fd(),
479 1,
480 &wbufs,
481 0,
482 SigevNotify::SigevNone,
483 ));
484 aiow.as_mut().submit().unwrap();
485
486 let err = poll_aio!(&mut aiow);
487 assert_eq!(err, Ok(()));
488 assert_eq!(aiow.as_mut().aio_return().unwrap(), wlen);
489
490 f.rewind().unwrap();
491 let len = f.read_to_end(&mut rbuf).unwrap();
492 assert_eq!(len, EXPECT.len());
493 assert_eq!(rbuf, EXPECT);
494 }
495 }
496
497 // Test an aio operation with completion delivered by a signal
498 #[test]
499 #[cfg_attr(
500 any(
501 all(target_env = "musl", target_arch = "x86_64"),
502 target_arch = "mips",
503 target_arch = "mips64"
504 ),
505 ignore
506 )]
sigev_signal()507 fn sigev_signal() {
508 let _m = crate::SIGNAL_MTX.lock();
509 let sa = SigAction::new(
510 SigHandler::Handler(sigfunc),
511 SaFlags::SA_RESETHAND,
512 SigSet::empty(),
513 );
514 SIGNALED.store(false, Ordering::Relaxed);
515 unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap();
516
517 const INITIAL: &[u8] = b"abcdef123456";
518 const WBUF: &[u8] = b"CDEF";
519 let mut rbuf = Vec::new();
520 const EXPECT: &[u8] = b"abCDEF123456";
521
522 let mut f = tempfile().unwrap();
523 f.write_all(INITIAL).unwrap();
524 let mut aiow = Box::pin(AioWrite::new(
525 f.as_raw_fd(),
526 2, //offset
527 WBUF,
528 0, //priority
529 SigevNotify::SigevSignal {
530 signal: Signal::SIGUSR2,
531 si_value: 0, //TODO: validate in sigfunc
532 },
533 ));
534 aiow.as_mut().submit().unwrap();
535 while !SIGNALED.load(Ordering::Relaxed) {
536 thread::sleep(time::Duration::from_millis(10));
537 }
538
539 assert_eq!(aiow.as_mut().aio_return().unwrap(), WBUF.len());
540 f.rewind().unwrap();
541 let len = f.read_to_end(&mut rbuf).unwrap();
542 assert_eq!(len, EXPECT.len());
543 assert_eq!(rbuf, EXPECT);
544 }
545
546 // Tests using aio_cancel_all for all outstanding IOs.
547 #[test]
548 #[cfg_attr(target_env = "musl", ignore)]
test_aio_cancel_all()549 fn test_aio_cancel_all() {
550 let wbuf: &[u8] = b"CDEF";
551
552 let f = tempfile().unwrap();
553 let mut aiocb = Box::pin(AioWrite::new(
554 f.as_raw_fd(),
555 0, //offset
556 wbuf,
557 0, //priority
558 SigevNotify::SigevNone,
559 ));
560 aiocb.as_mut().submit().unwrap();
561 let err = aiocb.as_mut().error();
562 assert!(err == Ok(()) || err == Err(Errno::EINPROGRESS));
563
564 aio_cancel_all(f.as_raw_fd()).unwrap();
565
566 // Wait for aiocb to complete, but don't care whether it succeeded
567 let _ = poll_aio!(&mut aiocb);
568 let _ = aiocb.as_mut().aio_return();
569 }
570
571 #[test]
572 // On Cirrus on Linux, this test fails due to a glibc bug.
573 // https://github.com/nix-rust/nix/issues/1099
574 #[cfg_attr(target_os = "linux", ignore)]
575 // On Cirrus, aio_suspend is failing with EINVAL
576 // https://github.com/nix-rust/nix/issues/1361
577 #[cfg_attr(target_os = "macos", ignore)]
test_aio_suspend()578 fn test_aio_suspend() {
579 const INITIAL: &[u8] = b"abcdef123456";
580 const WBUF: &[u8] = b"CDEFG";
581 let timeout = TimeSpec::seconds(10);
582 let mut rbuf = vec![0; 4];
583 let rlen = rbuf.len();
584 let mut f = tempfile().unwrap();
585 f.write_all(INITIAL).unwrap();
586
587 let mut wcb = Box::pin(AioWrite::new(
588 f.as_raw_fd(),
589 2, //offset
590 WBUF,
591 0, //priority
592 SigevNotify::SigevNone,
593 ));
594
595 let mut rcb = Box::pin(AioRead::new(
596 f.as_raw_fd(),
597 8, //offset
598 &mut rbuf,
599 0, //priority
600 SigevNotify::SigevNone,
601 ));
602 wcb.as_mut().submit().unwrap();
603 rcb.as_mut().submit().unwrap();
604 loop {
605 {
606 let cbbuf = [
607 &*wcb as &dyn AsRef<libc::aiocb>,
608 &*rcb as &dyn AsRef<libc::aiocb>,
609 ];
610 let r = aio_suspend(&cbbuf[..], Some(timeout));
611 match r {
612 Err(Errno::EINTR) => continue,
613 Err(e) => panic!("aio_suspend returned {:?}", e),
614 Ok(_) => (),
615 };
616 }
617 if rcb.as_mut().error() != Err(Errno::EINPROGRESS)
618 && wcb.as_mut().error() != Err(Errno::EINPROGRESS)
619 {
620 break;
621 }
622 }
623
624 assert_eq!(wcb.as_mut().aio_return().unwrap(), WBUF.len());
625 assert_eq!(rcb.as_mut().aio_return().unwrap(), rlen);
626 }
627