• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![warn(rust_2018_idioms)]
2 #![cfg(all(unix, feature = "full"))]
3 
4 use std::os::unix::io::{AsRawFd, RawFd};
5 use std::sync::{
6     atomic::{AtomicBool, Ordering},
7     Arc,
8 };
9 use std::time::Duration;
10 use std::{
11     future::Future,
12     io::{self, ErrorKind, Read, Write},
13     task::{Context, Waker},
14 };
15 
16 use nix::unistd::{read, write};
17 
18 use futures::poll;
19 
20 use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
21 use tokio_test::{assert_err, assert_pending};
22 
23 struct TestWaker {
24     inner: Arc<TestWakerInner>,
25     waker: Waker,
26 }
27 
28 #[derive(Default)]
29 struct TestWakerInner {
30     awoken: AtomicBool,
31 }
32 
33 impl futures::task::ArcWake for TestWakerInner {
wake_by_ref(arc_self: &Arc<Self>)34     fn wake_by_ref(arc_self: &Arc<Self>) {
35         arc_self.awoken.store(true, Ordering::SeqCst);
36     }
37 }
38 
39 impl TestWaker {
new() -> Self40     fn new() -> Self {
41         let inner: Arc<TestWakerInner> = Default::default();
42 
43         Self {
44             inner: inner.clone(),
45             waker: futures::task::waker(inner),
46         }
47     }
48 
awoken(&self) -> bool49     fn awoken(&self) -> bool {
50         self.inner.awoken.swap(false, Ordering::SeqCst)
51     }
52 
context(&self) -> Context<'_>53     fn context(&self) -> Context<'_> {
54         Context::from_waker(&self.waker)
55     }
56 }
57 
58 #[derive(Debug)]
59 struct FileDescriptor {
60     fd: std::os::fd::OwnedFd,
61 }
62 
63 impl AsRawFd for FileDescriptor {
as_raw_fd(&self) -> RawFd64     fn as_raw_fd(&self) -> RawFd {
65         self.fd.as_raw_fd()
66     }
67 }
68 
69 impl Read for &FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>70     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
71         read(self.fd.as_raw_fd(), buf).map_err(io::Error::from)
72     }
73 }
74 
75 impl Read for FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>76     fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
77         (self as &Self).read(buf)
78     }
79 }
80 
81 impl Write for &FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>82     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
83         write(&self.fd, buf).map_err(io::Error::from)
84     }
85 
flush(&mut self) -> io::Result<()>86     fn flush(&mut self) -> io::Result<()> {
87         Ok(())
88     }
89 }
90 
91 impl Write for FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>92     fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
93         (self as &Self).write(buf)
94     }
95 
flush(&mut self) -> io::Result<()>96     fn flush(&mut self) -> io::Result<()> {
97         (self as &Self).flush()
98     }
99 }
100 
set_nonblocking(fd: RawFd)101 fn set_nonblocking(fd: RawFd) {
102     use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
103 
104     let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
105 
106     if flags < 0 {
107         panic!(
108             "bad return value from fcntl(F_GETFL): {} ({:?})",
109             flags,
110             nix::Error::last()
111         );
112     }
113 
114     let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
115 
116     nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
117 }
118 
socketpair() -> (FileDescriptor, FileDescriptor)119 fn socketpair() -> (FileDescriptor, FileDescriptor) {
120     use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
121 
122     let (fd_a, fd_b) = socket::socketpair(
123         AddressFamily::Unix,
124         SockType::Stream,
125         None,
126         SockFlag::empty(),
127     )
128     .expect("socketpair");
129     let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
130 
131     set_nonblocking(fds.0.fd.as_raw_fd());
132     set_nonblocking(fds.1.fd.as_raw_fd());
133 
134     fds
135 }
136 
drain(mut fd: &FileDescriptor)137 fn drain(mut fd: &FileDescriptor) {
138     let mut buf = [0u8; 512];
139 
140     loop {
141         match fd.read(&mut buf[..]) {
142             Err(e) if e.kind() == ErrorKind::WouldBlock => break,
143             Ok(0) => panic!("unexpected EOF"),
144             Err(e) => panic!("unexpected error: {:?}", e),
145             Ok(_) => continue,
146         }
147     }
148 }
149 
150 #[tokio::test]
initially_writable()151 async fn initially_writable() {
152     let (a, b) = socketpair();
153 
154     let afd_a = AsyncFd::new(a).unwrap();
155     let afd_b = AsyncFd::new(b).unwrap();
156 
157     afd_a.writable().await.unwrap().clear_ready();
158     afd_b.writable().await.unwrap().clear_ready();
159 
160     tokio::select! {
161         biased;
162         _ = tokio::time::sleep(Duration::from_millis(10)) => {},
163         _ = afd_a.readable() => panic!("Unexpected readable state"),
164         _ = afd_b.readable() => panic!("Unexpected readable state"),
165     }
166 }
167 
168 #[tokio::test]
reset_readable()169 async fn reset_readable() {
170     let (a, mut b) = socketpair();
171 
172     let afd_a = AsyncFd::new(a).unwrap();
173 
174     let readable = afd_a.readable();
175     tokio::pin!(readable);
176 
177     tokio::select! {
178         _ = readable.as_mut() => panic!(),
179         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
180     }
181 
182     b.write_all(b"0").unwrap();
183 
184     let mut guard = readable.await.unwrap();
185 
186     guard
187         .try_io(|_| afd_a.get_ref().read(&mut [0]))
188         .unwrap()
189         .unwrap();
190 
191     // `a` is not readable, but the reactor still thinks it is
192     // (because we have not observed a not-ready error yet)
193     afd_a.readable().await.unwrap().retain_ready();
194 
195     // Explicitly clear the ready state
196     guard.clear_ready();
197 
198     let readable = afd_a.readable();
199     tokio::pin!(readable);
200 
201     tokio::select! {
202         _ = readable.as_mut() => panic!(),
203         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
204     }
205 
206     b.write_all(b"0").unwrap();
207 
208     // We can observe the new readable event
209     afd_a.readable().await.unwrap().clear_ready();
210 }
211 
212 #[tokio::test]
reset_writable()213 async fn reset_writable() {
214     let (a, b) = socketpair();
215 
216     let afd_a = AsyncFd::new(a).unwrap();
217 
218     let mut guard = afd_a.writable().await.unwrap();
219 
220     // Write until we get a WouldBlock. This also clears the ready state.
221     while guard
222         .try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
223         .is_ok()
224     {}
225 
226     // Writable state should be cleared now.
227     let writable = afd_a.writable();
228     tokio::pin!(writable);
229 
230     tokio::select! {
231         _ = writable.as_mut() => panic!(),
232         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
233     }
234 
235     // Read from the other side; we should become writable now.
236     drain(&b);
237 
238     let _ = writable.await.unwrap();
239 }
240 
241 #[derive(Debug)]
242 struct ArcFd<T>(Arc<T>);
243 impl<T: AsRawFd> AsRawFd for ArcFd<T> {
as_raw_fd(&self) -> RawFd244     fn as_raw_fd(&self) -> RawFd {
245         self.0.as_raw_fd()
246     }
247 }
248 
249 #[tokio::test]
drop_closes()250 async fn drop_closes() {
251     let (a, mut b) = socketpair();
252 
253     let afd_a = AsyncFd::new(a).unwrap();
254 
255     assert_eq!(
256         ErrorKind::WouldBlock,
257         b.read(&mut [0]).err().unwrap().kind()
258     );
259 
260     std::mem::drop(afd_a);
261 
262     assert_eq!(0, b.read(&mut [0]).unwrap());
263 
264     // into_inner does not close the fd
265 
266     let (a, mut b) = socketpair();
267     let afd_a = AsyncFd::new(a).unwrap();
268     let _a: FileDescriptor = afd_a.into_inner();
269 
270     assert_eq!(
271         ErrorKind::WouldBlock,
272         b.read(&mut [0]).err().unwrap().kind()
273     );
274 
275     // Drop closure behavior is delegated to the inner object
276     let (a, mut b) = socketpair();
277     let arc_fd = Arc::new(a);
278     let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
279     std::mem::drop(afd_a);
280 
281     assert_eq!(
282         ErrorKind::WouldBlock,
283         b.read(&mut [0]).err().unwrap().kind()
284     );
285 
286     std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
287 }
288 
289 #[tokio::test]
reregister()290 async fn reregister() {
291     let (a, _b) = socketpair();
292 
293     let afd_a = AsyncFd::new(a).unwrap();
294     let a = afd_a.into_inner();
295     AsyncFd::new(a).unwrap();
296 }
297 
298 #[tokio::test]
try_io()299 async fn try_io() {
300     let (a, mut b) = socketpair();
301 
302     b.write_all(b"0").unwrap();
303 
304     let afd_a = AsyncFd::new(a).unwrap();
305 
306     let mut guard = afd_a.readable().await.unwrap();
307 
308     afd_a.get_ref().read_exact(&mut [0]).unwrap();
309 
310     // Should not clear the readable state
311     let _ = guard.try_io(|_| Ok(()));
312 
313     // Still readable...
314     let _ = afd_a.readable().await.unwrap();
315 
316     // Should clear the readable state
317     let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
318 
319     // Assert not readable
320     let readable = afd_a.readable();
321     tokio::pin!(readable);
322 
323     tokio::select! {
324         _ = readable.as_mut() => panic!(),
325         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
326     }
327 
328     // Write something down b again and make sure we're reawoken
329     b.write_all(b"0").unwrap();
330     let _ = readable.await.unwrap();
331 }
332 
333 #[tokio::test]
multiple_waiters()334 async fn multiple_waiters() {
335     let (a, mut b) = socketpair();
336     let afd_a = Arc::new(AsyncFd::new(a).unwrap());
337 
338     let barrier = Arc::new(tokio::sync::Barrier::new(11));
339 
340     let mut tasks = Vec::new();
341     for _ in 0..10 {
342         let afd_a = afd_a.clone();
343         let barrier = barrier.clone();
344 
345         let f = async move {
346             let notify_barrier = async {
347                 barrier.wait().await;
348                 futures::future::pending::<()>().await;
349             };
350 
351             tokio::select! {
352                 biased;
353                 guard = afd_a.readable() => {
354                     tokio::task::yield_now().await;
355                     guard.unwrap().clear_ready()
356                 },
357                 _ = notify_barrier => unreachable!(),
358             }
359 
360             std::mem::drop(afd_a);
361         };
362 
363         tasks.push(tokio::spawn(f));
364     }
365 
366     let mut all_tasks = futures::future::try_join_all(tasks);
367 
368     tokio::select! {
369         r = std::pin::Pin::new(&mut all_tasks) => {
370             r.unwrap(); // propagate panic
371             panic!("Tasks exited unexpectedly")
372         },
373         _ = barrier.wait() => {}
374     };
375 
376     b.write_all(b"0").unwrap();
377 
378     all_tasks.await.unwrap();
379 }
380 
381 #[tokio::test]
poll_fns()382 async fn poll_fns() {
383     let (a, b) = socketpair();
384     let afd_a = Arc::new(AsyncFd::new(a).unwrap());
385     let afd_b = Arc::new(AsyncFd::new(b).unwrap());
386 
387     // Fill up the write side of A
388     while afd_a.get_ref().write(&[0; 512]).is_ok() {}
389 
390     let waker = TestWaker::new();
391 
392     assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
393 
394     let afd_a_2 = afd_a.clone();
395     let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
396     let barrier_clone = r_barrier.clone();
397 
398     let read_fut = tokio::spawn(async move {
399         // Move waker onto this task first
400         assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
401             .as_ref()
402             .poll_read_ready(cx))));
403         barrier_clone.wait().await;
404 
405         let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
406     });
407 
408     let afd_a_2 = afd_a.clone();
409     let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
410     let barrier_clone = w_barrier.clone();
411 
412     let mut write_fut = tokio::spawn(async move {
413         // Move waker onto this task first
414         assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
415             .as_ref()
416             .poll_write_ready(cx))));
417         barrier_clone.wait().await;
418 
419         let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
420     });
421 
422     r_barrier.wait().await;
423     w_barrier.wait().await;
424 
425     let readable = afd_a.readable();
426     tokio::pin!(readable);
427 
428     tokio::select! {
429         _ = &mut readable => unreachable!(),
430         _ = tokio::task::yield_now() => {}
431     }
432 
433     // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
434     afd_b.get_ref().write_all(b"0").unwrap();
435 
436     let _ = tokio::join!(readable, read_fut);
437 
438     // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
439     assert!(!waker.awoken());
440 
441     // The writable side should not be awoken
442     tokio::select! {
443         _ = &mut write_fut => unreachable!(),
444         _ = tokio::time::sleep(Duration::from_millis(5)) => {}
445     }
446 
447     // Make it writable now
448     drain(afd_b.get_ref());
449 
450     // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
451     let _ = write_fut.await;
452 }
453 
assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>>454 fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
455     let mut pinned = Box::pin(f);
456 
457     assert_pending!(pinned
458         .as_mut()
459         .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
460 
461     pinned
462 }
463 
rt() -> tokio::runtime::Runtime464 fn rt() -> tokio::runtime::Runtime {
465     tokio::runtime::Builder::new_current_thread()
466         .enable_all()
467         .build()
468         .unwrap()
469 }
470 
471 #[test]
driver_shutdown_wakes_currently_pending()472 fn driver_shutdown_wakes_currently_pending() {
473     let rt = rt();
474 
475     let (a, _b) = socketpair();
476     let afd_a = {
477         let _enter = rt.enter();
478         AsyncFd::new(a).unwrap()
479     };
480 
481     let readable = assert_pending(afd_a.readable());
482 
483     std::mem::drop(rt);
484 
485     // The future was initialized **before** dropping the rt
486     assert_err!(futures::executor::block_on(readable));
487 
488     // The future is initialized **after** dropping the rt.
489     assert_err!(futures::executor::block_on(afd_a.readable()));
490 }
491 
492 #[test]
driver_shutdown_wakes_future_pending()493 fn driver_shutdown_wakes_future_pending() {
494     let rt = rt();
495 
496     let (a, _b) = socketpair();
497     let afd_a = {
498         let _enter = rt.enter();
499         AsyncFd::new(a).unwrap()
500     };
501 
502     std::mem::drop(rt);
503 
504     assert_err!(futures::executor::block_on(afd_a.readable()));
505 }
506 
507 #[test]
driver_shutdown_wakes_pending_race()508 fn driver_shutdown_wakes_pending_race() {
509     // TODO: make this a loom test
510     for _ in 0..100 {
511         let rt = rt();
512 
513         let (a, _b) = socketpair();
514         let afd_a = {
515             let _enter = rt.enter();
516             AsyncFd::new(a).unwrap()
517         };
518 
519         let _ = std::thread::spawn(move || std::mem::drop(rt));
520 
521         // This may or may not return an error (but will be awoken)
522         let _ = futures::executor::block_on(afd_a.readable());
523 
524         // However retrying will always return an error
525         assert_err!(futures::executor::block_on(afd_a.readable()));
526     }
527 }
528 
poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>529 async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
530     futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
531 }
532 
poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>533 async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
534     futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
535 }
536 
537 #[test]
driver_shutdown_wakes_currently_pending_polls()538 fn driver_shutdown_wakes_currently_pending_polls() {
539     let rt = rt();
540 
541     let (a, _b) = socketpair();
542     let afd_a = {
543         let _enter = rt.enter();
544         AsyncFd::new(a).unwrap()
545     };
546 
547     while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
548 
549     let readable = assert_pending(poll_readable(&afd_a));
550     let writable = assert_pending(poll_writable(&afd_a));
551 
552     std::mem::drop(rt);
553 
554     // Attempting to poll readiness when the rt is dropped is an error
555     assert_err!(futures::executor::block_on(readable));
556     assert_err!(futures::executor::block_on(writable));
557 }
558 
559 #[test]
driver_shutdown_wakes_poll()560 fn driver_shutdown_wakes_poll() {
561     let rt = rt();
562 
563     let (a, _b) = socketpair();
564     let afd_a = {
565         let _enter = rt.enter();
566         AsyncFd::new(a).unwrap()
567     };
568 
569     std::mem::drop(rt);
570 
571     assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
572     assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
573 }
574 
575 #[test]
driver_shutdown_wakes_poll_race()576 fn driver_shutdown_wakes_poll_race() {
577     // TODO: make this a loom test
578     for _ in 0..100 {
579         let rt = rt();
580 
581         let (a, _b) = socketpair();
582         let afd_a = {
583             let _enter = rt.enter();
584             AsyncFd::new(a).unwrap()
585         };
586 
587         while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
588 
589         let _ = std::thread::spawn(move || std::mem::drop(rt));
590 
591         // The poll variants will always return an error in this case
592         assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
593         assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
594     }
595 }
596 
597 #[tokio::test]
598 #[cfg(any(target_os = "linux", target_os = "android"))]
priority_event_on_oob_data()599 async fn priority_event_on_oob_data() {
600     use std::net::SocketAddr;
601 
602     use tokio::io::Interest;
603 
604     let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
605 
606     let listener = std::net::TcpListener::bind(addr).unwrap();
607     let addr = listener.local_addr().unwrap();
608 
609     let client = std::net::TcpStream::connect(addr).unwrap();
610     let client = AsyncFd::with_interest(client, Interest::PRIORITY).unwrap();
611 
612     let (stream, _) = listener.accept().unwrap();
613 
614     // Sending out of band data should trigger priority event.
615     send_oob_data(&stream, b"hello").unwrap();
616 
617     let _ = client.ready(Interest::PRIORITY).await.unwrap();
618 }
619 
620 #[cfg(any(target_os = "linux", target_os = "android"))]
send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize>621 fn send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize> {
622     unsafe {
623         let res = libc::send(
624             stream.as_raw_fd(),
625             data.as_ptr().cast(),
626             data.len(),
627             libc::MSG_OOB,
628         );
629         if res == -1 {
630             Err(io::Error::last_os_error())
631         } else {
632             Ok(res as usize)
633         }
634     }
635 }
636 
637 #[tokio::test]
clear_ready_matching_clears_ready()638 async fn clear_ready_matching_clears_ready() {
639     use tokio::io::{Interest, Ready};
640 
641     let (a, mut b) = socketpair();
642 
643     let afd_a = AsyncFd::new(a).unwrap();
644     b.write_all(b"0").unwrap();
645 
646     let mut guard = afd_a
647         .ready(Interest::READABLE | Interest::WRITABLE)
648         .await
649         .unwrap();
650 
651     assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);
652 
653     guard.clear_ready_matching(Ready::READABLE);
654     assert_eq!(guard.ready(), Ready::WRITABLE);
655 
656     guard.clear_ready_matching(Ready::WRITABLE);
657     assert_eq!(guard.ready(), Ready::EMPTY);
658 }
659 
660 #[tokio::test]
clear_ready_matching_clears_ready_mut()661 async fn clear_ready_matching_clears_ready_mut() {
662     use tokio::io::{Interest, Ready};
663 
664     let (a, mut b) = socketpair();
665 
666     let mut afd_a = AsyncFd::new(a).unwrap();
667     b.write_all(b"0").unwrap();
668 
669     let mut guard = afd_a
670         .ready_mut(Interest::READABLE | Interest::WRITABLE)
671         .await
672         .unwrap();
673 
674     assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);
675 
676     guard.clear_ready_matching(Ready::READABLE);
677     assert_eq!(guard.ready(), Ready::WRITABLE);
678 
679     guard.clear_ready_matching(Ready::WRITABLE);
680     assert_eq!(guard.ready(), Ready::EMPTY);
681 }
682 
683 #[tokio::test]
684 #[cfg(target_os = "linux")]
await_error_readiness_timestamping()685 async fn await_error_readiness_timestamping() {
686     use std::net::{Ipv4Addr, SocketAddr};
687 
688     use tokio::io::{Interest, Ready};
689 
690     let address_a = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
691     let address_b = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
692 
693     let socket = std::net::UdpSocket::bind(address_a).unwrap();
694 
695     socket.set_nonblocking(true).unwrap();
696 
697     // configure send timestamps
698     configure_timestamping_socket(&socket).unwrap();
699 
700     socket.connect(address_b).unwrap();
701 
702     let fd = AsyncFd::new(socket).unwrap();
703 
704     tokio::select! {
705         _ = fd.ready(Interest::ERROR) => panic!(),
706         _ = tokio::time::sleep(Duration::from_millis(10)) => {}
707     }
708 
709     let buf = b"hello there";
710     fd.get_ref().send(buf).unwrap();
711 
712     // the send timestamp should now be in the error queue
713     let guard = fd.ready(Interest::ERROR).await.unwrap();
714     assert_eq!(guard.ready(), Ready::ERROR);
715 }
716 
717 #[cfg(target_os = "linux")]
configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int>718 fn configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int> {
719     // enable software timestamping, and specifically software send timestamping
720     let options = libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_TX_SOFTWARE;
721 
722     let res = unsafe {
723         libc::setsockopt(
724             udp_socket.as_raw_fd(),
725             libc::SOL_SOCKET,
726             libc::SO_TIMESTAMP,
727             &options as *const _ as *const libc::c_void,
728             std::mem::size_of_val(&options) as libc::socklen_t,
729         )
730     };
731 
732     if res == -1 {
733         Err(std::io::Error::last_os_error())
734     } else {
735         Ok(res)
736     }
737 }
738 
739 #[tokio::test]
740 #[cfg(target_os = "linux")]
await_error_readiness_invalid_address()741 async fn await_error_readiness_invalid_address() {
742     use std::net::{Ipv4Addr, SocketAddr};
743     use tokio::io::{Interest, Ready};
744 
745     let socket_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
746     let socket = std::net::UdpSocket::bind(socket_addr).unwrap();
747     let socket_fd = socket.as_raw_fd();
748 
749     // Enable IP_RECVERR option to receive error messages
750     // https://man7.org/linux/man-pages/man7/ip.7.html has some extra information
751     let recv_err: libc::c_int = 1;
752     unsafe {
753         let res = libc::setsockopt(
754             socket.as_raw_fd(),
755             libc::SOL_IP,
756             libc::IP_RECVERR,
757             &recv_err as *const _ as *const libc::c_void,
758             std::mem::size_of_val(&recv_err) as libc::socklen_t,
759         );
760         if res == -1 {
761             panic!("{:?}", std::io::Error::last_os_error());
762         }
763     }
764 
765     // Spawn a separate thread for sending messages
766     tokio::spawn(async move {
767         // Set the destination address. This address is invalid in this context. the OS will notice
768         // that nobody is listening on port this port. Normally this is ignored (UDP is "fire and forget"),
769         // but because IP_RECVERR is enabled, the error will actually be reported to the sending socket
770         let mut dest_addr =
771             unsafe { std::mem::MaybeUninit::<libc::sockaddr_in>::zeroed().assume_init() };
772         dest_addr.sin_family = libc::AF_INET as _;
773         // based on https://en.wikipedia.org/wiki/Ephemeral_port, we should pick a port number
774         // below 1024 to guarantee that other tests don't select this port by accident when they
775         // use port 0 to select an ephemeral port.
776         dest_addr.sin_port = 512u16.to_be(); // Destination port
777 
778         // Prepare the message data
779         let message = "Hello, Socket!";
780 
781         // Prepare the message structure for sendmsg
782         let mut iov = libc::iovec {
783             iov_base: message.as_ptr() as *mut libc::c_void,
784             iov_len: message.len(),
785         };
786 
787         // Prepare the destination address for the sendmsg call
788         let dest_sockaddr: *const libc::sockaddr = &dest_addr as *const _ as *const libc::sockaddr;
789         let dest_addrlen: libc::socklen_t = std::mem::size_of_val(&dest_addr) as libc::socklen_t;
790 
791         let mut msg: libc::msghdr = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };
792         msg.msg_name = dest_sockaddr as *mut libc::c_void;
793         msg.msg_namelen = dest_addrlen;
794         msg.msg_iov = &mut iov;
795         msg.msg_iovlen = 1;
796 
797         if unsafe { libc::sendmsg(socket_fd, &msg, 0) } == -1 {
798             Err(std::io::Error::last_os_error()).unwrap()
799         }
800     });
801 
802     let fd = AsyncFd::new(socket).unwrap();
803 
804     let guard = fd.ready(Interest::ERROR).await.unwrap();
805     assert_eq!(guard.ready(), Ready::ERROR);
806 }
807