• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(all(unix, feature = "full"))]
3 
4 use std::os::unix::io::{AsRawFd, RawFd};
5 use std::sync::{
6     atomic::{AtomicBool, Ordering},
7     Arc,
8 };
9 use std::time::Duration;
10 use std::{
11     future::Future,
12     io::{self, ErrorKind, Read, Write},
13     task::{Context, Waker},
14 };
15 
16 use nix::unistd::{close, read, write};
17 
18 use futures::poll;
19 
20 use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
21 use tokio_test::{assert_err, assert_pending};
22 
23 struct TestWaker {
24     inner: Arc<TestWakerInner>,
25     waker: Waker,
26 }
27 
28 #[derive(Default)]
29 struct TestWakerInner {
30     awoken: AtomicBool,
31 }
32 
33 impl futures::task::ArcWake for TestWakerInner {
wake_by_ref(arc_self: &Arc<Self>)34     fn wake_by_ref(arc_self: &Arc<Self>) {
35         arc_self.awoken.store(true, Ordering::SeqCst);
36     }
37 }
38 
39 impl TestWaker {
new() -> Self40     fn new() -> Self {
41         let inner: Arc<TestWakerInner> = Default::default();
42 
43         Self {
44             inner: inner.clone(),
45             waker: futures::task::waker(inner),
46         }
47     }
48 
awoken(&self) -> bool49     fn awoken(&self) -> bool {
50         self.inner.awoken.swap(false, Ordering::SeqCst)
51     }
52 
context(&self) -> Context<'_>53     fn context(&self) -> Context<'_> {
54         Context::from_waker(&self.waker)
55     }
56 }
57 
58 #[derive(Debug)]
59 struct FileDescriptor {
60     fd: RawFd,
61 }
62 
63 impl AsRawFd for FileDescriptor {
as_raw_fd(&self) -> RawFd64     fn as_raw_fd(&self) -> RawFd {
65         self.fd
66     }
67 }
68 
69 impl Read for &FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>70     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
71         read(self.fd, buf).map_err(io::Error::from)
72     }
73 }
74 
75 impl Read for FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>76     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
77         (self as &Self).read(buf)
78     }
79 }
80 
81 impl Write for &FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>82     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
83         write(self.fd, buf).map_err(io::Error::from)
84     }
85 
flush(&mut self) -> io::Result<()>86     fn flush(&mut self) -> io::Result<()> {
87         Ok(())
88     }
89 }
90 
91 impl Write for FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>92     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
93         (self as &Self).write(buf)
94     }
95 
flush(&mut self) -> io::Result<()>96     fn flush(&mut self) -> io::Result<()> {
97         (self as &Self).flush()
98     }
99 }
100 
101 impl Drop for FileDescriptor {
drop(&mut self)102     fn drop(&mut self) {
103         let _ = close(self.fd);
104     }
105 }
106 
set_nonblocking(fd: RawFd)107 fn set_nonblocking(fd: RawFd) {
108     use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
109 
110     let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
111 
112     if flags < 0 {
113         panic!(
114             "bad return value from fcntl(F_GETFL): {} ({:?})",
115             flags,
116             nix::Error::last()
117         );
118     }
119 
120     let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
121 
122     nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
123 }
124 
socketpair() -> (FileDescriptor, FileDescriptor)125 fn socketpair() -> (FileDescriptor, FileDescriptor) {
126     use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
127 
128     let (fd_a, fd_b) = socket::socketpair(
129         AddressFamily::Unix,
130         SockType::Stream,
131         None,
132         SockFlag::empty(),
133     )
134     .expect("socketpair");
135     let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
136 
137     set_nonblocking(fds.0.fd);
138     set_nonblocking(fds.1.fd);
139 
140     fds
141 }
142 
drain(mut fd: &FileDescriptor)143 fn drain(mut fd: &FileDescriptor) {
144     let mut buf = [0u8; 512];
145 
146     loop {
147         match fd.read(&mut buf[..]) {
148             Err(e) if e.kind() == ErrorKind::WouldBlock => break,
149             Ok(0) => panic!("unexpected EOF"),
150             Err(e) => panic!("unexpected error: {:?}", e),
151             Ok(_) => continue,
152         }
153     }
154 }
155 
156 #[tokio::test]
initially_writable()157 async fn initially_writable() {
158     let (a, b) = socketpair();
159 
160     let afd_a = AsyncFd::new(a).unwrap();
161     let afd_b = AsyncFd::new(b).unwrap();
162 
163     afd_a.writable().await.unwrap().clear_ready();
164     afd_b.writable().await.unwrap().clear_ready();
165 
166     tokio::select! {
167         biased;
168         _ = tokio::time::sleep(Duration::from_millis(10)) => {},
169         _ = afd_a.readable() => panic!("Unexpected readable state"),
170         _ = afd_b.readable() => panic!("Unexpected readable state"),
171     }
172 }
173 
174 #[tokio::test]
reset_readable()175 async fn reset_readable() {
176     let (a, mut b) = socketpair();
177 
178     let afd_a = AsyncFd::new(a).unwrap();
179 
180     let readable = afd_a.readable();
181     tokio::pin!(readable);
182 
183     tokio::select! {
184         _ = readable.as_mut() => panic!(),
185         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
186     }
187 
188     b.write_all(b"0").unwrap();
189 
190     let mut guard = readable.await.unwrap();
191 
192     guard
193         .try_io(|_| afd_a.get_ref().read(&mut [0]))
194         .unwrap()
195         .unwrap();
196 
197     // `a` is not readable, but the reactor still thinks it is
198     // (because we have not observed a not-ready error yet)
199     afd_a.readable().await.unwrap().retain_ready();
200 
201     // Explicitly clear the ready state
202     guard.clear_ready();
203 
204     let readable = afd_a.readable();
205     tokio::pin!(readable);
206 
207     tokio::select! {
208         _ = readable.as_mut() => panic!(),
209         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
210     }
211 
212     b.write_all(b"0").unwrap();
213 
214     // We can observe the new readable event
215     afd_a.readable().await.unwrap().clear_ready();
216 }
217 
218 #[tokio::test]
reset_writable()219 async fn reset_writable() {
220     let (a, b) = socketpair();
221 
222     let afd_a = AsyncFd::new(a).unwrap();
223 
224     let mut guard = afd_a.writable().await.unwrap();
225 
226     // Write until we get a WouldBlock. This also clears the ready state.
227     while guard
228         .try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
229         .is_ok()
230     {}
231 
232     // Writable state should be cleared now.
233     let writable = afd_a.writable();
234     tokio::pin!(writable);
235 
236     tokio::select! {
237         _ = writable.as_mut() => panic!(),
238         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
239     }
240 
241     // Read from the other side; we should become writable now.
242     drain(&b);
243 
244     let _ = writable.await.unwrap();
245 }
246 
247 #[derive(Debug)]
248 struct ArcFd<T>(Arc<T>);
249 impl<T: AsRawFd> AsRawFd for ArcFd<T> {
as_raw_fd(&self) -> RawFd250     fn as_raw_fd(&self) -> RawFd {
251         self.0.as_raw_fd()
252     }
253 }
254 
255 #[tokio::test]
drop_closes()256 async fn drop_closes() {
257     let (a, mut b) = socketpair();
258 
259     let afd_a = AsyncFd::new(a).unwrap();
260 
261     assert_eq!(
262         ErrorKind::WouldBlock,
263         b.read(&mut [0]).err().unwrap().kind()
264     );
265 
266     std::mem::drop(afd_a);
267 
268     assert_eq!(0, b.read(&mut [0]).unwrap());
269 
270     // into_inner does not close the fd
271 
272     let (a, mut b) = socketpair();
273     let afd_a = AsyncFd::new(a).unwrap();
274     let _a: FileDescriptor = afd_a.into_inner();
275 
276     assert_eq!(
277         ErrorKind::WouldBlock,
278         b.read(&mut [0]).err().unwrap().kind()
279     );
280 
281     // Drop closure behavior is delegated to the inner object
282     let (a, mut b) = socketpair();
283     let arc_fd = Arc::new(a);
284     let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
285     std::mem::drop(afd_a);
286 
287     assert_eq!(
288         ErrorKind::WouldBlock,
289         b.read(&mut [0]).err().unwrap().kind()
290     );
291 
292     std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
293 }
294 
295 #[tokio::test]
reregister()296 async fn reregister() {
297     let (a, _b) = socketpair();
298 
299     let afd_a = AsyncFd::new(a).unwrap();
300     let a = afd_a.into_inner();
301     AsyncFd::new(a).unwrap();
302 }
303 
304 #[tokio::test]
try_io()305 async fn try_io() {
306     let (a, mut b) = socketpair();
307 
308     b.write_all(b"0").unwrap();
309 
310     let afd_a = AsyncFd::new(a).unwrap();
311 
312     let mut guard = afd_a.readable().await.unwrap();
313 
314     afd_a.get_ref().read_exact(&mut [0]).unwrap();
315 
316     // Should not clear the readable state
317     let _ = guard.try_io(|_| Ok(()));
318 
319     // Still readable...
320     let _ = afd_a.readable().await.unwrap();
321 
322     // Should clear the readable state
323     let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
324 
325     // Assert not readable
326     let readable = afd_a.readable();
327     tokio::pin!(readable);
328 
329     tokio::select! {
330         _ = readable.as_mut() => panic!(),
331         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
332     }
333 
334     // Write something down b again and make sure we're reawoken
335     b.write_all(b"0").unwrap();
336     let _ = readable.await.unwrap();
337 }
338 
339 #[tokio::test]
multiple_waiters()340 async fn multiple_waiters() {
341     let (a, mut b) = socketpair();
342     let afd_a = Arc::new(AsyncFd::new(a).unwrap());
343 
344     let barrier = Arc::new(tokio::sync::Barrier::new(11));
345 
346     let mut tasks = Vec::new();
347     for _ in 0..10 {
348         let afd_a = afd_a.clone();
349         let barrier = barrier.clone();
350 
351         let f = async move {
352             let notify_barrier = async {
353                 barrier.wait().await;
354                 futures::future::pending::<()>().await;
355             };
356 
357             tokio::select! {
358                 biased;
359                 guard = afd_a.readable() => {
360                     tokio::task::yield_now().await;
361                     guard.unwrap().clear_ready()
362                 },
363                 _ = notify_barrier => unreachable!(),
364             }
365 
366             std::mem::drop(afd_a);
367         };
368 
369         tasks.push(tokio::spawn(f));
370     }
371 
372     let mut all_tasks = futures::future::try_join_all(tasks);
373 
374     tokio::select! {
375         r = std::pin::Pin::new(&mut all_tasks) => {
376             r.unwrap(); // propagate panic
377             panic!("Tasks exited unexpectedly")
378         },
379         _ = barrier.wait() => {}
380     };
381 
382     b.write_all(b"0").unwrap();
383 
384     all_tasks.await.unwrap();
385 }
386 
387 #[tokio::test]
poll_fns()388 async fn poll_fns() {
389     let (a, b) = socketpair();
390     let afd_a = Arc::new(AsyncFd::new(a).unwrap());
391     let afd_b = Arc::new(AsyncFd::new(b).unwrap());
392 
393     // Fill up the write side of A
394     while afd_a.get_ref().write(&[0; 512]).is_ok() {}
395 
396     let waker = TestWaker::new();
397 
398     assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
399 
400     let afd_a_2 = afd_a.clone();
401     let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
402     let barrier_clone = r_barrier.clone();
403 
404     let read_fut = tokio::spawn(async move {
405         // Move waker onto this task first
406         assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
407             .as_ref()
408             .poll_read_ready(cx))));
409         barrier_clone.wait().await;
410 
411         let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
412     });
413 
414     let afd_a_2 = afd_a.clone();
415     let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
416     let barrier_clone = w_barrier.clone();
417 
418     let mut write_fut = tokio::spawn(async move {
419         // Move waker onto this task first
420         assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
421             .as_ref()
422             .poll_write_ready(cx))));
423         barrier_clone.wait().await;
424 
425         let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
426     });
427 
428     r_barrier.wait().await;
429     w_barrier.wait().await;
430 
431     let readable = afd_a.readable();
432     tokio::pin!(readable);
433 
434     tokio::select! {
435         _ = &mut readable => unreachable!(),
436         _ = tokio::task::yield_now() => {}
437     }
438 
439     // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
440     afd_b.get_ref().write_all(b"0").unwrap();
441 
442     let _ = tokio::join!(readable, read_fut);
443 
444     // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
445     assert!(!waker.awoken());
446 
447     // The writable side should not be awoken
448     tokio::select! {
449         _ = &mut write_fut => unreachable!(),
450         _ = tokio::time::sleep(Duration::from_millis(5)) => {}
451     }
452 
453     // Make it writable now
454     drain(afd_b.get_ref());
455 
456     // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
457     let _ = write_fut.await;
458 }
459 
assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>>460 fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
461     let mut pinned = Box::pin(f);
462 
463     assert_pending!(pinned
464         .as_mut()
465         .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
466 
467     pinned
468 }
469 
rt() -> tokio::runtime::Runtime470 fn rt() -> tokio::runtime::Runtime {
471     tokio::runtime::Builder::new_current_thread()
472         .enable_all()
473         .build()
474         .unwrap()
475 }
476 
477 #[test]
driver_shutdown_wakes_currently_pending()478 fn driver_shutdown_wakes_currently_pending() {
479     let rt = rt();
480 
481     let (a, _b) = socketpair();
482     let afd_a = {
483         let _enter = rt.enter();
484         AsyncFd::new(a).unwrap()
485     };
486 
487     let readable = assert_pending(afd_a.readable());
488 
489     std::mem::drop(rt);
490 
491     // The future was initialized **before** dropping the rt
492     assert_err!(futures::executor::block_on(readable));
493 
494     // The future is initialized **after** dropping the rt.
495     assert_err!(futures::executor::block_on(afd_a.readable()));
496 }
497 
498 #[test]
driver_shutdown_wakes_future_pending()499 fn driver_shutdown_wakes_future_pending() {
500     let rt = rt();
501 
502     let (a, _b) = socketpair();
503     let afd_a = {
504         let _enter = rt.enter();
505         AsyncFd::new(a).unwrap()
506     };
507 
508     std::mem::drop(rt);
509 
510     assert_err!(futures::executor::block_on(afd_a.readable()));
511 }
512 
513 #[test]
driver_shutdown_wakes_pending_race()514 fn driver_shutdown_wakes_pending_race() {
515     // TODO: make this a loom test
516     for _ in 0..100 {
517         let rt = rt();
518 
519         let (a, _b) = socketpair();
520         let afd_a = {
521             let _enter = rt.enter();
522             AsyncFd::new(a).unwrap()
523         };
524 
525         let _ = std::thread::spawn(move || std::mem::drop(rt));
526 
527         // This may or may not return an error (but will be awoken)
528         let _ = futures::executor::block_on(afd_a.readable());
529 
530         // However retrying will always return an error
531         assert_err!(futures::executor::block_on(afd_a.readable()));
532     }
533 }
534 
poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>535 async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
536     futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
537 }
538 
poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>539 async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
540     futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
541 }
542 
543 #[test]
driver_shutdown_wakes_currently_pending_polls()544 fn driver_shutdown_wakes_currently_pending_polls() {
545     let rt = rt();
546 
547     let (a, _b) = socketpair();
548     let afd_a = {
549         let _enter = rt.enter();
550         AsyncFd::new(a).unwrap()
551     };
552 
553     while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
554 
555     let readable = assert_pending(poll_readable(&afd_a));
556     let writable = assert_pending(poll_writable(&afd_a));
557 
558     std::mem::drop(rt);
559 
560     // Attempting to poll readiness when the rt is dropped is an error
561     assert_err!(futures::executor::block_on(readable));
562     assert_err!(futures::executor::block_on(writable));
563 }
564 
565 #[test]
driver_shutdown_wakes_poll()566 fn driver_shutdown_wakes_poll() {
567     let rt = rt();
568 
569     let (a, _b) = socketpair();
570     let afd_a = {
571         let _enter = rt.enter();
572         AsyncFd::new(a).unwrap()
573     };
574 
575     std::mem::drop(rt);
576 
577     assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
578     assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
579 }
580 
581 #[test]
driver_shutdown_wakes_poll_race()582 fn driver_shutdown_wakes_poll_race() {
583     // TODO: make this a loom test
584     for _ in 0..100 {
585         let rt = rt();
586 
587         let (a, _b) = socketpair();
588         let afd_a = {
589             let _enter = rt.enter();
590             AsyncFd::new(a).unwrap()
591         };
592 
593         while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
594 
595         let _ = std::thread::spawn(move || std::mem::drop(rt));
596 
597         // The poll variants will always return an error in this case
598         assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
599         assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
600     }
601 }
602