1 #![warn(rust_2018_idioms)]
2 #![cfg(all(unix, feature = "full"))]
3
4 use std::os::unix::io::{AsRawFd, RawFd};
5 use std::sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc,
8 };
9 use std::time::Duration;
10 use std::{
11 future::Future,
12 io::{self, ErrorKind, Read, Write},
13 task::{Context, Waker},
14 };
15
16 use nix::unistd::{close, read, write};
17
18 use futures::poll;
19
20 use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
21 use tokio_test::{assert_err, assert_pending};
22
23 struct TestWaker {
24 inner: Arc<TestWakerInner>,
25 waker: Waker,
26 }
27
28 #[derive(Default)]
29 struct TestWakerInner {
30 awoken: AtomicBool,
31 }
32
33 impl futures::task::ArcWake for TestWakerInner {
wake_by_ref(arc_self: &Arc<Self>)34 fn wake_by_ref(arc_self: &Arc<Self>) {
35 arc_self.awoken.store(true, Ordering::SeqCst);
36 }
37 }
38
39 impl TestWaker {
new() -> Self40 fn new() -> Self {
41 let inner: Arc<TestWakerInner> = Default::default();
42
43 Self {
44 inner: inner.clone(),
45 waker: futures::task::waker(inner),
46 }
47 }
48
awoken(&self) -> bool49 fn awoken(&self) -> bool {
50 self.inner.awoken.swap(false, Ordering::SeqCst)
51 }
52
context(&self) -> Context<'_>53 fn context(&self) -> Context<'_> {
54 Context::from_waker(&self.waker)
55 }
56 }
57
58 #[derive(Debug)]
59 struct FileDescriptor {
60 fd: RawFd,
61 }
62
63 impl AsRawFd for FileDescriptor {
as_raw_fd(&self) -> RawFd64 fn as_raw_fd(&self) -> RawFd {
65 self.fd
66 }
67 }
68
69 impl Read for &FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>70 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
71 read(self.fd, buf).map_err(io::Error::from)
72 }
73 }
74
75 impl Read for FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>76 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
77 (self as &Self).read(buf)
78 }
79 }
80
81 impl Write for &FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>82 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
83 write(self.fd, buf).map_err(io::Error::from)
84 }
85
flush(&mut self) -> io::Result<()>86 fn flush(&mut self) -> io::Result<()> {
87 Ok(())
88 }
89 }
90
91 impl Write for FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>92 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
93 (self as &Self).write(buf)
94 }
95
flush(&mut self) -> io::Result<()>96 fn flush(&mut self) -> io::Result<()> {
97 (self as &Self).flush()
98 }
99 }
100
101 impl Drop for FileDescriptor {
drop(&mut self)102 fn drop(&mut self) {
103 let _ = close(self.fd);
104 }
105 }
106
set_nonblocking(fd: RawFd)107 fn set_nonblocking(fd: RawFd) {
108 use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
109
110 let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
111
112 if flags < 0 {
113 panic!(
114 "bad return value from fcntl(F_GETFL): {} ({:?})",
115 flags,
116 nix::Error::last()
117 );
118 }
119
120 let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
121
122 nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
123 }
124
socketpair() -> (FileDescriptor, FileDescriptor)125 fn socketpair() -> (FileDescriptor, FileDescriptor) {
126 use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
127
128 let (fd_a, fd_b) = socket::socketpair(
129 AddressFamily::Unix,
130 SockType::Stream,
131 None,
132 SockFlag::empty(),
133 )
134 .expect("socketpair");
135 let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
136
137 set_nonblocking(fds.0.fd);
138 set_nonblocking(fds.1.fd);
139
140 fds
141 }
142
drain(mut fd: &FileDescriptor)143 fn drain(mut fd: &FileDescriptor) {
144 let mut buf = [0u8; 512];
145
146 loop {
147 match fd.read(&mut buf[..]) {
148 Err(e) if e.kind() == ErrorKind::WouldBlock => break,
149 Ok(0) => panic!("unexpected EOF"),
150 Err(e) => panic!("unexpected error: {:?}", e),
151 Ok(_) => continue,
152 }
153 }
154 }
155
156 #[tokio::test]
initially_writable()157 async fn initially_writable() {
158 let (a, b) = socketpair();
159
160 let afd_a = AsyncFd::new(a).unwrap();
161 let afd_b = AsyncFd::new(b).unwrap();
162
163 afd_a.writable().await.unwrap().clear_ready();
164 afd_b.writable().await.unwrap().clear_ready();
165
166 tokio::select! {
167 biased;
168 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
169 _ = afd_a.readable() => panic!("Unexpected readable state"),
170 _ = afd_b.readable() => panic!("Unexpected readable state"),
171 }
172 }
173
174 #[tokio::test]
reset_readable()175 async fn reset_readable() {
176 let (a, mut b) = socketpair();
177
178 let afd_a = AsyncFd::new(a).unwrap();
179
180 let readable = afd_a.readable();
181 tokio::pin!(readable);
182
183 tokio::select! {
184 _ = readable.as_mut() => panic!(),
185 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
186 }
187
188 b.write_all(b"0").unwrap();
189
190 let mut guard = readable.await.unwrap();
191
192 guard
193 .try_io(|_| afd_a.get_ref().read(&mut [0]))
194 .unwrap()
195 .unwrap();
196
197 // `a` is not readable, but the reactor still thinks it is
198 // (because we have not observed a not-ready error yet)
199 afd_a.readable().await.unwrap().retain_ready();
200
201 // Explicitly clear the ready state
202 guard.clear_ready();
203
204 let readable = afd_a.readable();
205 tokio::pin!(readable);
206
207 tokio::select! {
208 _ = readable.as_mut() => panic!(),
209 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
210 }
211
212 b.write_all(b"0").unwrap();
213
214 // We can observe the new readable event
215 afd_a.readable().await.unwrap().clear_ready();
216 }
217
218 #[tokio::test]
reset_writable()219 async fn reset_writable() {
220 let (a, b) = socketpair();
221
222 let afd_a = AsyncFd::new(a).unwrap();
223
224 let mut guard = afd_a.writable().await.unwrap();
225
226 // Write until we get a WouldBlock. This also clears the ready state.
227 while guard
228 .try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
229 .is_ok()
230 {}
231
232 // Writable state should be cleared now.
233 let writable = afd_a.writable();
234 tokio::pin!(writable);
235
236 tokio::select! {
237 _ = writable.as_mut() => panic!(),
238 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
239 }
240
241 // Read from the other side; we should become writable now.
242 drain(&b);
243
244 let _ = writable.await.unwrap();
245 }
246
247 #[derive(Debug)]
248 struct ArcFd<T>(Arc<T>);
249 impl<T: AsRawFd> AsRawFd for ArcFd<T> {
as_raw_fd(&self) -> RawFd250 fn as_raw_fd(&self) -> RawFd {
251 self.0.as_raw_fd()
252 }
253 }
254
255 #[tokio::test]
drop_closes()256 async fn drop_closes() {
257 let (a, mut b) = socketpair();
258
259 let afd_a = AsyncFd::new(a).unwrap();
260
261 assert_eq!(
262 ErrorKind::WouldBlock,
263 b.read(&mut [0]).err().unwrap().kind()
264 );
265
266 std::mem::drop(afd_a);
267
268 assert_eq!(0, b.read(&mut [0]).unwrap());
269
270 // into_inner does not close the fd
271
272 let (a, mut b) = socketpair();
273 let afd_a = AsyncFd::new(a).unwrap();
274 let _a: FileDescriptor = afd_a.into_inner();
275
276 assert_eq!(
277 ErrorKind::WouldBlock,
278 b.read(&mut [0]).err().unwrap().kind()
279 );
280
281 // Drop closure behavior is delegated to the inner object
282 let (a, mut b) = socketpair();
283 let arc_fd = Arc::new(a);
284 let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
285 std::mem::drop(afd_a);
286
287 assert_eq!(
288 ErrorKind::WouldBlock,
289 b.read(&mut [0]).err().unwrap().kind()
290 );
291
292 std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
293 }
294
295 #[tokio::test]
reregister()296 async fn reregister() {
297 let (a, _b) = socketpair();
298
299 let afd_a = AsyncFd::new(a).unwrap();
300 let a = afd_a.into_inner();
301 AsyncFd::new(a).unwrap();
302 }
303
304 #[tokio::test]
try_io()305 async fn try_io() {
306 let (a, mut b) = socketpair();
307
308 b.write_all(b"0").unwrap();
309
310 let afd_a = AsyncFd::new(a).unwrap();
311
312 let mut guard = afd_a.readable().await.unwrap();
313
314 afd_a.get_ref().read_exact(&mut [0]).unwrap();
315
316 // Should not clear the readable state
317 let _ = guard.try_io(|_| Ok(()));
318
319 // Still readable...
320 let _ = afd_a.readable().await.unwrap();
321
322 // Should clear the readable state
323 let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
324
325 // Assert not readable
326 let readable = afd_a.readable();
327 tokio::pin!(readable);
328
329 tokio::select! {
330 _ = readable.as_mut() => panic!(),
331 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
332 }
333
334 // Write something down b again and make sure we're reawoken
335 b.write_all(b"0").unwrap();
336 let _ = readable.await.unwrap();
337 }
338
339 #[tokio::test]
multiple_waiters()340 async fn multiple_waiters() {
341 let (a, mut b) = socketpair();
342 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
343
344 let barrier = Arc::new(tokio::sync::Barrier::new(11));
345
346 let mut tasks = Vec::new();
347 for _ in 0..10 {
348 let afd_a = afd_a.clone();
349 let barrier = barrier.clone();
350
351 let f = async move {
352 let notify_barrier = async {
353 barrier.wait().await;
354 futures::future::pending::<()>().await;
355 };
356
357 tokio::select! {
358 biased;
359 guard = afd_a.readable() => {
360 tokio::task::yield_now().await;
361 guard.unwrap().clear_ready()
362 },
363 _ = notify_barrier => unreachable!(),
364 }
365
366 std::mem::drop(afd_a);
367 };
368
369 tasks.push(tokio::spawn(f));
370 }
371
372 let mut all_tasks = futures::future::try_join_all(tasks);
373
374 tokio::select! {
375 r = std::pin::Pin::new(&mut all_tasks) => {
376 r.unwrap(); // propagate panic
377 panic!("Tasks exited unexpectedly")
378 },
379 _ = barrier.wait() => {}
380 };
381
382 b.write_all(b"0").unwrap();
383
384 all_tasks.await.unwrap();
385 }
386
387 #[tokio::test]
poll_fns()388 async fn poll_fns() {
389 let (a, b) = socketpair();
390 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
391 let afd_b = Arc::new(AsyncFd::new(b).unwrap());
392
393 // Fill up the write side of A
394 while afd_a.get_ref().write(&[0; 512]).is_ok() {}
395
396 let waker = TestWaker::new();
397
398 assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
399
400 let afd_a_2 = afd_a.clone();
401 let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
402 let barrier_clone = r_barrier.clone();
403
404 let read_fut = tokio::spawn(async move {
405 // Move waker onto this task first
406 assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
407 .as_ref()
408 .poll_read_ready(cx))));
409 barrier_clone.wait().await;
410
411 let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
412 });
413
414 let afd_a_2 = afd_a.clone();
415 let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
416 let barrier_clone = w_barrier.clone();
417
418 let mut write_fut = tokio::spawn(async move {
419 // Move waker onto this task first
420 assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
421 .as_ref()
422 .poll_write_ready(cx))));
423 barrier_clone.wait().await;
424
425 let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
426 });
427
428 r_barrier.wait().await;
429 w_barrier.wait().await;
430
431 let readable = afd_a.readable();
432 tokio::pin!(readable);
433
434 tokio::select! {
435 _ = &mut readable => unreachable!(),
436 _ = tokio::task::yield_now() => {}
437 }
438
439 // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
440 afd_b.get_ref().write_all(b"0").unwrap();
441
442 let _ = tokio::join!(readable, read_fut);
443
444 // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
445 assert!(!waker.awoken());
446
447 // The writable side should not be awoken
448 tokio::select! {
449 _ = &mut write_fut => unreachable!(),
450 _ = tokio::time::sleep(Duration::from_millis(5)) => {}
451 }
452
453 // Make it writable now
454 drain(afd_b.get_ref());
455
456 // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
457 let _ = write_fut.await;
458 }
459
assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>>460 fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
461 let mut pinned = Box::pin(f);
462
463 assert_pending!(pinned
464 .as_mut()
465 .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
466
467 pinned
468 }
469
rt() -> tokio::runtime::Runtime470 fn rt() -> tokio::runtime::Runtime {
471 tokio::runtime::Builder::new_current_thread()
472 .enable_all()
473 .build()
474 .unwrap()
475 }
476
477 #[test]
driver_shutdown_wakes_currently_pending()478 fn driver_shutdown_wakes_currently_pending() {
479 let rt = rt();
480
481 let (a, _b) = socketpair();
482 let afd_a = {
483 let _enter = rt.enter();
484 AsyncFd::new(a).unwrap()
485 };
486
487 let readable = assert_pending(afd_a.readable());
488
489 std::mem::drop(rt);
490
491 // The future was initialized **before** dropping the rt
492 assert_err!(futures::executor::block_on(readable));
493
494 // The future is initialized **after** dropping the rt.
495 assert_err!(futures::executor::block_on(afd_a.readable()));
496 }
497
498 #[test]
driver_shutdown_wakes_future_pending()499 fn driver_shutdown_wakes_future_pending() {
500 let rt = rt();
501
502 let (a, _b) = socketpair();
503 let afd_a = {
504 let _enter = rt.enter();
505 AsyncFd::new(a).unwrap()
506 };
507
508 std::mem::drop(rt);
509
510 assert_err!(futures::executor::block_on(afd_a.readable()));
511 }
512
513 #[test]
driver_shutdown_wakes_pending_race()514 fn driver_shutdown_wakes_pending_race() {
515 // TODO: make this a loom test
516 for _ in 0..100 {
517 let rt = rt();
518
519 let (a, _b) = socketpair();
520 let afd_a = {
521 let _enter = rt.enter();
522 AsyncFd::new(a).unwrap()
523 };
524
525 let _ = std::thread::spawn(move || std::mem::drop(rt));
526
527 // This may or may not return an error (but will be awoken)
528 let _ = futures::executor::block_on(afd_a.readable());
529
530 // However retrying will always return an error
531 assert_err!(futures::executor::block_on(afd_a.readable()));
532 }
533 }
534
poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>535 async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
536 futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
537 }
538
poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>539 async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
540 futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
541 }
542
543 #[test]
driver_shutdown_wakes_currently_pending_polls()544 fn driver_shutdown_wakes_currently_pending_polls() {
545 let rt = rt();
546
547 let (a, _b) = socketpair();
548 let afd_a = {
549 let _enter = rt.enter();
550 AsyncFd::new(a).unwrap()
551 };
552
553 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
554
555 let readable = assert_pending(poll_readable(&afd_a));
556 let writable = assert_pending(poll_writable(&afd_a));
557
558 std::mem::drop(rt);
559
560 // Attempting to poll readiness when the rt is dropped is an error
561 assert_err!(futures::executor::block_on(readable));
562 assert_err!(futures::executor::block_on(writable));
563 }
564
565 #[test]
driver_shutdown_wakes_poll()566 fn driver_shutdown_wakes_poll() {
567 let rt = rt();
568
569 let (a, _b) = socketpair();
570 let afd_a = {
571 let _enter = rt.enter();
572 AsyncFd::new(a).unwrap()
573 };
574
575 std::mem::drop(rt);
576
577 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
578 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
579 }
580
581 #[test]
driver_shutdown_wakes_poll_race()582 fn driver_shutdown_wakes_poll_race() {
583 // TODO: make this a loom test
584 for _ in 0..100 {
585 let rt = rt();
586
587 let (a, _b) = socketpair();
588 let afd_a = {
589 let _enter = rt.enter();
590 AsyncFd::new(a).unwrap()
591 };
592
593 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
594
595 let _ = std::thread::spawn(move || std::mem::drop(rt));
596
597 // The poll variants will always return an error in this case
598 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
599 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
600 }
601 }
602