1 #![warn(rust_2018_idioms)]
2 #![cfg(all(unix, feature = "full"))]
3
4 use std::os::unix::io::{AsRawFd, RawFd};
5 use std::sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc,
8 };
9 use std::time::Duration;
10 use std::{
11 future::Future,
12 io::{self, ErrorKind, Read, Write},
13 task::{Context, Waker},
14 };
15
16 use nix::unistd::{read, write};
17
18 use futures::poll;
19
20 use tokio::io::unix::{AsyncFd, AsyncFdReadyGuard};
21 use tokio_test::{assert_err, assert_pending};
22
23 struct TestWaker {
24 inner: Arc<TestWakerInner>,
25 waker: Waker,
26 }
27
28 #[derive(Default)]
29 struct TestWakerInner {
30 awoken: AtomicBool,
31 }
32
33 impl futures::task::ArcWake for TestWakerInner {
wake_by_ref(arc_self: &Arc<Self>)34 fn wake_by_ref(arc_self: &Arc<Self>) {
35 arc_self.awoken.store(true, Ordering::SeqCst);
36 }
37 }
38
39 impl TestWaker {
new() -> Self40 fn new() -> Self {
41 let inner: Arc<TestWakerInner> = Default::default();
42
43 Self {
44 inner: inner.clone(),
45 waker: futures::task::waker(inner),
46 }
47 }
48
awoken(&self) -> bool49 fn awoken(&self) -> bool {
50 self.inner.awoken.swap(false, Ordering::SeqCst)
51 }
52
context(&self) -> Context<'_>53 fn context(&self) -> Context<'_> {
54 Context::from_waker(&self.waker)
55 }
56 }
57
58 #[derive(Debug)]
59 struct FileDescriptor {
60 fd: std::os::fd::OwnedFd,
61 }
62
63 impl AsRawFd for FileDescriptor {
as_raw_fd(&self) -> RawFd64 fn as_raw_fd(&self) -> RawFd {
65 self.fd.as_raw_fd()
66 }
67 }
68
69 impl Read for &FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>70 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
71 read(self.fd.as_raw_fd(), buf).map_err(io::Error::from)
72 }
73 }
74
75 impl Read for FileDescriptor {
read(&mut self, buf: &mut [u8]) -> io::Result<usize>76 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
77 (self as &Self).read(buf)
78 }
79 }
80
81 impl Write for &FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>82 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
83 write(&self.fd, buf).map_err(io::Error::from)
84 }
85
flush(&mut self) -> io::Result<()>86 fn flush(&mut self) -> io::Result<()> {
87 Ok(())
88 }
89 }
90
91 impl Write for FileDescriptor {
write(&mut self, buf: &[u8]) -> io::Result<usize>92 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
93 (self as &Self).write(buf)
94 }
95
flush(&mut self) -> io::Result<()>96 fn flush(&mut self) -> io::Result<()> {
97 (self as &Self).flush()
98 }
99 }
100
set_nonblocking(fd: RawFd)101 fn set_nonblocking(fd: RawFd) {
102 use nix::fcntl::{OFlag, F_GETFL, F_SETFL};
103
104 let flags = nix::fcntl::fcntl(fd, F_GETFL).expect("fcntl(F_GETFD)");
105
106 if flags < 0 {
107 panic!(
108 "bad return value from fcntl(F_GETFL): {} ({:?})",
109 flags,
110 nix::Error::last()
111 );
112 }
113
114 let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
115
116 nix::fcntl::fcntl(fd, F_SETFL(flags)).expect("fcntl(F_SETFD)");
117 }
118
socketpair() -> (FileDescriptor, FileDescriptor)119 fn socketpair() -> (FileDescriptor, FileDescriptor) {
120 use nix::sys::socket::{self, AddressFamily, SockFlag, SockType};
121
122 let (fd_a, fd_b) = socket::socketpair(
123 AddressFamily::Unix,
124 SockType::Stream,
125 None,
126 SockFlag::empty(),
127 )
128 .expect("socketpair");
129 let fds = (FileDescriptor { fd: fd_a }, FileDescriptor { fd: fd_b });
130
131 set_nonblocking(fds.0.fd.as_raw_fd());
132 set_nonblocking(fds.1.fd.as_raw_fd());
133
134 fds
135 }
136
drain(mut fd: &FileDescriptor)137 fn drain(mut fd: &FileDescriptor) {
138 let mut buf = [0u8; 512];
139
140 loop {
141 match fd.read(&mut buf[..]) {
142 Err(e) if e.kind() == ErrorKind::WouldBlock => break,
143 Ok(0) => panic!("unexpected EOF"),
144 Err(e) => panic!("unexpected error: {:?}", e),
145 Ok(_) => continue,
146 }
147 }
148 }
149
150 #[tokio::test]
initially_writable()151 async fn initially_writable() {
152 let (a, b) = socketpair();
153
154 let afd_a = AsyncFd::new(a).unwrap();
155 let afd_b = AsyncFd::new(b).unwrap();
156
157 afd_a.writable().await.unwrap().clear_ready();
158 afd_b.writable().await.unwrap().clear_ready();
159
160 tokio::select! {
161 biased;
162 _ = tokio::time::sleep(Duration::from_millis(10)) => {},
163 _ = afd_a.readable() => panic!("Unexpected readable state"),
164 _ = afd_b.readable() => panic!("Unexpected readable state"),
165 }
166 }
167
168 #[tokio::test]
reset_readable()169 async fn reset_readable() {
170 let (a, mut b) = socketpair();
171
172 let afd_a = AsyncFd::new(a).unwrap();
173
174 let readable = afd_a.readable();
175 tokio::pin!(readable);
176
177 tokio::select! {
178 _ = readable.as_mut() => panic!(),
179 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
180 }
181
182 b.write_all(b"0").unwrap();
183
184 let mut guard = readable.await.unwrap();
185
186 guard
187 .try_io(|_| afd_a.get_ref().read(&mut [0]))
188 .unwrap()
189 .unwrap();
190
191 // `a` is not readable, but the reactor still thinks it is
192 // (because we have not observed a not-ready error yet)
193 afd_a.readable().await.unwrap().retain_ready();
194
195 // Explicitly clear the ready state
196 guard.clear_ready();
197
198 let readable = afd_a.readable();
199 tokio::pin!(readable);
200
201 tokio::select! {
202 _ = readable.as_mut() => panic!(),
203 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
204 }
205
206 b.write_all(b"0").unwrap();
207
208 // We can observe the new readable event
209 afd_a.readable().await.unwrap().clear_ready();
210 }
211
212 #[tokio::test]
reset_writable()213 async fn reset_writable() {
214 let (a, b) = socketpair();
215
216 let afd_a = AsyncFd::new(a).unwrap();
217
218 let mut guard = afd_a.writable().await.unwrap();
219
220 // Write until we get a WouldBlock. This also clears the ready state.
221 while guard
222 .try_io(|_| afd_a.get_ref().write(&[0; 512][..]))
223 .is_ok()
224 {}
225
226 // Writable state should be cleared now.
227 let writable = afd_a.writable();
228 tokio::pin!(writable);
229
230 tokio::select! {
231 _ = writable.as_mut() => panic!(),
232 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
233 }
234
235 // Read from the other side; we should become writable now.
236 drain(&b);
237
238 let _ = writable.await.unwrap();
239 }
240
241 #[derive(Debug)]
242 struct ArcFd<T>(Arc<T>);
243 impl<T: AsRawFd> AsRawFd for ArcFd<T> {
as_raw_fd(&self) -> RawFd244 fn as_raw_fd(&self) -> RawFd {
245 self.0.as_raw_fd()
246 }
247 }
248
249 #[tokio::test]
drop_closes()250 async fn drop_closes() {
251 let (a, mut b) = socketpair();
252
253 let afd_a = AsyncFd::new(a).unwrap();
254
255 assert_eq!(
256 ErrorKind::WouldBlock,
257 b.read(&mut [0]).err().unwrap().kind()
258 );
259
260 std::mem::drop(afd_a);
261
262 assert_eq!(0, b.read(&mut [0]).unwrap());
263
264 // into_inner does not close the fd
265
266 let (a, mut b) = socketpair();
267 let afd_a = AsyncFd::new(a).unwrap();
268 let _a: FileDescriptor = afd_a.into_inner();
269
270 assert_eq!(
271 ErrorKind::WouldBlock,
272 b.read(&mut [0]).err().unwrap().kind()
273 );
274
275 // Drop closure behavior is delegated to the inner object
276 let (a, mut b) = socketpair();
277 let arc_fd = Arc::new(a);
278 let afd_a = AsyncFd::new(ArcFd(arc_fd.clone())).unwrap();
279 std::mem::drop(afd_a);
280
281 assert_eq!(
282 ErrorKind::WouldBlock,
283 b.read(&mut [0]).err().unwrap().kind()
284 );
285
286 std::mem::drop(arc_fd); // suppress unnecessary clone clippy warning
287 }
288
289 #[tokio::test]
reregister()290 async fn reregister() {
291 let (a, _b) = socketpair();
292
293 let afd_a = AsyncFd::new(a).unwrap();
294 let a = afd_a.into_inner();
295 AsyncFd::new(a).unwrap();
296 }
297
298 #[tokio::test]
try_io()299 async fn try_io() {
300 let (a, mut b) = socketpair();
301
302 b.write_all(b"0").unwrap();
303
304 let afd_a = AsyncFd::new(a).unwrap();
305
306 let mut guard = afd_a.readable().await.unwrap();
307
308 afd_a.get_ref().read_exact(&mut [0]).unwrap();
309
310 // Should not clear the readable state
311 let _ = guard.try_io(|_| Ok(()));
312
313 // Still readable...
314 let _ = afd_a.readable().await.unwrap();
315
316 // Should clear the readable state
317 let _ = guard.try_io(|_| io::Result::<()>::Err(ErrorKind::WouldBlock.into()));
318
319 // Assert not readable
320 let readable = afd_a.readable();
321 tokio::pin!(readable);
322
323 tokio::select! {
324 _ = readable.as_mut() => panic!(),
325 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
326 }
327
328 // Write something down b again and make sure we're reawoken
329 b.write_all(b"0").unwrap();
330 let _ = readable.await.unwrap();
331 }
332
333 #[tokio::test]
multiple_waiters()334 async fn multiple_waiters() {
335 let (a, mut b) = socketpair();
336 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
337
338 let barrier = Arc::new(tokio::sync::Barrier::new(11));
339
340 let mut tasks = Vec::new();
341 for _ in 0..10 {
342 let afd_a = afd_a.clone();
343 let barrier = barrier.clone();
344
345 let f = async move {
346 let notify_barrier = async {
347 barrier.wait().await;
348 futures::future::pending::<()>().await;
349 };
350
351 tokio::select! {
352 biased;
353 guard = afd_a.readable() => {
354 tokio::task::yield_now().await;
355 guard.unwrap().clear_ready()
356 },
357 _ = notify_barrier => unreachable!(),
358 }
359
360 std::mem::drop(afd_a);
361 };
362
363 tasks.push(tokio::spawn(f));
364 }
365
366 let mut all_tasks = futures::future::try_join_all(tasks);
367
368 tokio::select! {
369 r = std::pin::Pin::new(&mut all_tasks) => {
370 r.unwrap(); // propagate panic
371 panic!("Tasks exited unexpectedly")
372 },
373 _ = barrier.wait() => {}
374 };
375
376 b.write_all(b"0").unwrap();
377
378 all_tasks.await.unwrap();
379 }
380
381 #[tokio::test]
poll_fns()382 async fn poll_fns() {
383 let (a, b) = socketpair();
384 let afd_a = Arc::new(AsyncFd::new(a).unwrap());
385 let afd_b = Arc::new(AsyncFd::new(b).unwrap());
386
387 // Fill up the write side of A
388 while afd_a.get_ref().write(&[0; 512]).is_ok() {}
389
390 let waker = TestWaker::new();
391
392 assert_pending!(afd_a.as_ref().poll_read_ready(&mut waker.context()));
393
394 let afd_a_2 = afd_a.clone();
395 let r_barrier = Arc::new(tokio::sync::Barrier::new(2));
396 let barrier_clone = r_barrier.clone();
397
398 let read_fut = tokio::spawn(async move {
399 // Move waker onto this task first
400 assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
401 .as_ref()
402 .poll_read_ready(cx))));
403 barrier_clone.wait().await;
404
405 let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_read_ready(cx)).await;
406 });
407
408 let afd_a_2 = afd_a.clone();
409 let w_barrier = Arc::new(tokio::sync::Barrier::new(2));
410 let barrier_clone = w_barrier.clone();
411
412 let mut write_fut = tokio::spawn(async move {
413 // Move waker onto this task first
414 assert_pending!(poll!(futures::future::poll_fn(|cx| afd_a_2
415 .as_ref()
416 .poll_write_ready(cx))));
417 barrier_clone.wait().await;
418
419 let _ = futures::future::poll_fn(|cx| afd_a_2.as_ref().poll_write_ready(cx)).await;
420 });
421
422 r_barrier.wait().await;
423 w_barrier.wait().await;
424
425 let readable = afd_a.readable();
426 tokio::pin!(readable);
427
428 tokio::select! {
429 _ = &mut readable => unreachable!(),
430 _ = tokio::task::yield_now() => {}
431 }
432
433 // Make A readable. We expect that 'readable' and 'read_fut' will both complete quickly
434 afd_b.get_ref().write_all(b"0").unwrap();
435
436 let _ = tokio::join!(readable, read_fut);
437
438 // Our original waker should _not_ be awoken (poll_read_ready retains only the last context)
439 assert!(!waker.awoken());
440
441 // The writable side should not be awoken
442 tokio::select! {
443 _ = &mut write_fut => unreachable!(),
444 _ = tokio::time::sleep(Duration::from_millis(5)) => {}
445 }
446
447 // Make it writable now
448 drain(afd_b.get_ref());
449
450 // now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
451 let _ = write_fut.await;
452 }
453
assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>>454 fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
455 let mut pinned = Box::pin(f);
456
457 assert_pending!(pinned
458 .as_mut()
459 .poll(&mut Context::from_waker(futures::task::noop_waker_ref())));
460
461 pinned
462 }
463
rt() -> tokio::runtime::Runtime464 fn rt() -> tokio::runtime::Runtime {
465 tokio::runtime::Builder::new_current_thread()
466 .enable_all()
467 .build()
468 .unwrap()
469 }
470
471 #[test]
driver_shutdown_wakes_currently_pending()472 fn driver_shutdown_wakes_currently_pending() {
473 let rt = rt();
474
475 let (a, _b) = socketpair();
476 let afd_a = {
477 let _enter = rt.enter();
478 AsyncFd::new(a).unwrap()
479 };
480
481 let readable = assert_pending(afd_a.readable());
482
483 std::mem::drop(rt);
484
485 // The future was initialized **before** dropping the rt
486 assert_err!(futures::executor::block_on(readable));
487
488 // The future is initialized **after** dropping the rt.
489 assert_err!(futures::executor::block_on(afd_a.readable()));
490 }
491
492 #[test]
driver_shutdown_wakes_future_pending()493 fn driver_shutdown_wakes_future_pending() {
494 let rt = rt();
495
496 let (a, _b) = socketpair();
497 let afd_a = {
498 let _enter = rt.enter();
499 AsyncFd::new(a).unwrap()
500 };
501
502 std::mem::drop(rt);
503
504 assert_err!(futures::executor::block_on(afd_a.readable()));
505 }
506
507 #[test]
driver_shutdown_wakes_pending_race()508 fn driver_shutdown_wakes_pending_race() {
509 // TODO: make this a loom test
510 for _ in 0..100 {
511 let rt = rt();
512
513 let (a, _b) = socketpair();
514 let afd_a = {
515 let _enter = rt.enter();
516 AsyncFd::new(a).unwrap()
517 };
518
519 let _ = std::thread::spawn(move || std::mem::drop(rt));
520
521 // This may or may not return an error (but will be awoken)
522 let _ = futures::executor::block_on(afd_a.readable());
523
524 // However retrying will always return an error
525 assert_err!(futures::executor::block_on(afd_a.readable()));
526 }
527 }
528
poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>529 async fn poll_readable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
530 futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
531 }
532
poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>>533 async fn poll_writable<T: AsRawFd>(fd: &AsyncFd<T>) -> std::io::Result<AsyncFdReadyGuard<'_, T>> {
534 futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
535 }
536
537 #[test]
driver_shutdown_wakes_currently_pending_polls()538 fn driver_shutdown_wakes_currently_pending_polls() {
539 let rt = rt();
540
541 let (a, _b) = socketpair();
542 let afd_a = {
543 let _enter = rt.enter();
544 AsyncFd::new(a).unwrap()
545 };
546
547 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
548
549 let readable = assert_pending(poll_readable(&afd_a));
550 let writable = assert_pending(poll_writable(&afd_a));
551
552 std::mem::drop(rt);
553
554 // Attempting to poll readiness when the rt is dropped is an error
555 assert_err!(futures::executor::block_on(readable));
556 assert_err!(futures::executor::block_on(writable));
557 }
558
559 #[test]
driver_shutdown_wakes_poll()560 fn driver_shutdown_wakes_poll() {
561 let rt = rt();
562
563 let (a, _b) = socketpair();
564 let afd_a = {
565 let _enter = rt.enter();
566 AsyncFd::new(a).unwrap()
567 };
568
569 std::mem::drop(rt);
570
571 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
572 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
573 }
574
575 #[test]
driver_shutdown_wakes_poll_race()576 fn driver_shutdown_wakes_poll_race() {
577 // TODO: make this a loom test
578 for _ in 0..100 {
579 let rt = rt();
580
581 let (a, _b) = socketpair();
582 let afd_a = {
583 let _enter = rt.enter();
584 AsyncFd::new(a).unwrap()
585 };
586
587 while afd_a.get_ref().write(&[0; 512]).is_ok() {} // make not writable
588
589 let _ = std::thread::spawn(move || std::mem::drop(rt));
590
591 // The poll variants will always return an error in this case
592 assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
593 assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
594 }
595 }
596
597 #[tokio::test]
598 #[cfg(any(target_os = "linux", target_os = "android"))]
priority_event_on_oob_data()599 async fn priority_event_on_oob_data() {
600 use std::net::SocketAddr;
601
602 use tokio::io::Interest;
603
604 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
605
606 let listener = std::net::TcpListener::bind(addr).unwrap();
607 let addr = listener.local_addr().unwrap();
608
609 let client = std::net::TcpStream::connect(addr).unwrap();
610 let client = AsyncFd::with_interest(client, Interest::PRIORITY).unwrap();
611
612 let (stream, _) = listener.accept().unwrap();
613
614 // Sending out of band data should trigger priority event.
615 send_oob_data(&stream, b"hello").unwrap();
616
617 let _ = client.ready(Interest::PRIORITY).await.unwrap();
618 }
619
620 #[cfg(any(target_os = "linux", target_os = "android"))]
send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize>621 fn send_oob_data<S: AsRawFd>(stream: &S, data: &[u8]) -> io::Result<usize> {
622 unsafe {
623 let res = libc::send(
624 stream.as_raw_fd(),
625 data.as_ptr().cast(),
626 data.len(),
627 libc::MSG_OOB,
628 );
629 if res == -1 {
630 Err(io::Error::last_os_error())
631 } else {
632 Ok(res as usize)
633 }
634 }
635 }
636
637 #[tokio::test]
clear_ready_matching_clears_ready()638 async fn clear_ready_matching_clears_ready() {
639 use tokio::io::{Interest, Ready};
640
641 let (a, mut b) = socketpair();
642
643 let afd_a = AsyncFd::new(a).unwrap();
644 b.write_all(b"0").unwrap();
645
646 let mut guard = afd_a
647 .ready(Interest::READABLE | Interest::WRITABLE)
648 .await
649 .unwrap();
650
651 assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);
652
653 guard.clear_ready_matching(Ready::READABLE);
654 assert_eq!(guard.ready(), Ready::WRITABLE);
655
656 guard.clear_ready_matching(Ready::WRITABLE);
657 assert_eq!(guard.ready(), Ready::EMPTY);
658 }
659
660 #[tokio::test]
clear_ready_matching_clears_ready_mut()661 async fn clear_ready_matching_clears_ready_mut() {
662 use tokio::io::{Interest, Ready};
663
664 let (a, mut b) = socketpair();
665
666 let mut afd_a = AsyncFd::new(a).unwrap();
667 b.write_all(b"0").unwrap();
668
669 let mut guard = afd_a
670 .ready_mut(Interest::READABLE | Interest::WRITABLE)
671 .await
672 .unwrap();
673
674 assert_eq!(guard.ready(), Ready::READABLE | Ready::WRITABLE);
675
676 guard.clear_ready_matching(Ready::READABLE);
677 assert_eq!(guard.ready(), Ready::WRITABLE);
678
679 guard.clear_ready_matching(Ready::WRITABLE);
680 assert_eq!(guard.ready(), Ready::EMPTY);
681 }
682
683 #[tokio::test]
684 #[cfg(target_os = "linux")]
await_error_readiness_timestamping()685 async fn await_error_readiness_timestamping() {
686 use std::net::{Ipv4Addr, SocketAddr};
687
688 use tokio::io::{Interest, Ready};
689
690 let address_a = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
691 let address_b = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
692
693 let socket = std::net::UdpSocket::bind(address_a).unwrap();
694
695 socket.set_nonblocking(true).unwrap();
696
697 // configure send timestamps
698 configure_timestamping_socket(&socket).unwrap();
699
700 socket.connect(address_b).unwrap();
701
702 let fd = AsyncFd::new(socket).unwrap();
703
704 tokio::select! {
705 _ = fd.ready(Interest::ERROR) => panic!(),
706 _ = tokio::time::sleep(Duration::from_millis(10)) => {}
707 }
708
709 let buf = b"hello there";
710 fd.get_ref().send(buf).unwrap();
711
712 // the send timestamp should now be in the error queue
713 let guard = fd.ready(Interest::ERROR).await.unwrap();
714 assert_eq!(guard.ready(), Ready::ERROR);
715 }
716
717 #[cfg(target_os = "linux")]
configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int>718 fn configure_timestamping_socket(udp_socket: &std::net::UdpSocket) -> std::io::Result<libc::c_int> {
719 // enable software timestamping, and specifically software send timestamping
720 let options = libc::SOF_TIMESTAMPING_SOFTWARE | libc::SOF_TIMESTAMPING_TX_SOFTWARE;
721
722 let res = unsafe {
723 libc::setsockopt(
724 udp_socket.as_raw_fd(),
725 libc::SOL_SOCKET,
726 libc::SO_TIMESTAMP,
727 &options as *const _ as *const libc::c_void,
728 std::mem::size_of_val(&options) as libc::socklen_t,
729 )
730 };
731
732 if res == -1 {
733 Err(std::io::Error::last_os_error())
734 } else {
735 Ok(res)
736 }
737 }
738
739 #[tokio::test]
740 #[cfg(target_os = "linux")]
await_error_readiness_invalid_address()741 async fn await_error_readiness_invalid_address() {
742 use std::net::{Ipv4Addr, SocketAddr};
743 use tokio::io::{Interest, Ready};
744
745 let socket_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 0));
746 let socket = std::net::UdpSocket::bind(socket_addr).unwrap();
747 let socket_fd = socket.as_raw_fd();
748
749 // Enable IP_RECVERR option to receive error messages
750 // https://man7.org/linux/man-pages/man7/ip.7.html has some extra information
751 let recv_err: libc::c_int = 1;
752 unsafe {
753 let res = libc::setsockopt(
754 socket.as_raw_fd(),
755 libc::SOL_IP,
756 libc::IP_RECVERR,
757 &recv_err as *const _ as *const libc::c_void,
758 std::mem::size_of_val(&recv_err) as libc::socklen_t,
759 );
760 if res == -1 {
761 panic!("{:?}", std::io::Error::last_os_error());
762 }
763 }
764
765 // Spawn a separate thread for sending messages
766 tokio::spawn(async move {
767 // Set the destination address. This address is invalid in this context. the OS will notice
768 // that nobody is listening on port this port. Normally this is ignored (UDP is "fire and forget"),
769 // but because IP_RECVERR is enabled, the error will actually be reported to the sending socket
770 let mut dest_addr =
771 unsafe { std::mem::MaybeUninit::<libc::sockaddr_in>::zeroed().assume_init() };
772 dest_addr.sin_family = libc::AF_INET as _;
773 // based on https://en.wikipedia.org/wiki/Ephemeral_port, we should pick a port number
774 // below 1024 to guarantee that other tests don't select this port by accident when they
775 // use port 0 to select an ephemeral port.
776 dest_addr.sin_port = 512u16.to_be(); // Destination port
777
778 // Prepare the message data
779 let message = "Hello, Socket!";
780
781 // Prepare the message structure for sendmsg
782 let mut iov = libc::iovec {
783 iov_base: message.as_ptr() as *mut libc::c_void,
784 iov_len: message.len(),
785 };
786
787 // Prepare the destination address for the sendmsg call
788 let dest_sockaddr: *const libc::sockaddr = &dest_addr as *const _ as *const libc::sockaddr;
789 let dest_addrlen: libc::socklen_t = std::mem::size_of_val(&dest_addr) as libc::socklen_t;
790
791 let mut msg: libc::msghdr = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };
792 msg.msg_name = dest_sockaddr as *mut libc::c_void;
793 msg.msg_namelen = dest_addrlen;
794 msg.msg_iov = &mut iov;
795 msg.msg_iovlen = 1;
796
797 if unsafe { libc::sendmsg(socket_fd, &msg, 0) } == -1 {
798 Err(std::io::Error::last_os_error()).unwrap()
799 }
800 });
801
802 let fd = AsyncFd::new(socket).unwrap();
803
804 let guard = fd.ready(Interest::ERROR).await.unwrap();
805 assert_eq!(guard.ready(), Ready::ERROR);
806 }
807