1 #![cfg(any(target_os = "android", target_os = "linux"))]
2
3 use rustix::fd::{AsFd, OwnedFd};
4 use rustix::io::epoll::{self, Epoll};
5 use rustix::io::{ioctl_fionbio, read, write};
6 use rustix::net::{
7 accept, bind_v4, connect_v4, getsockname, listen, socket, AddressFamily, Ipv4Addr, Protocol,
8 SocketAddrAny, SocketAddrV4, SocketType,
9 };
10 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
11 use std::sync::{Arc, Condvar, Mutex};
12 use std::thread;
13
14 const BUFFER_SIZE: usize = 20;
15
server(ready: Arc<(Mutex<u16>, Condvar)>)16 fn server(ready: Arc<(Mutex<u16>, Condvar)>) {
17 let listen_sock = socket(AddressFamily::INET, SocketType::STREAM, Protocol::default()).unwrap();
18 bind_v4(&listen_sock, &SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).unwrap();
19 listen(&listen_sock, 1).unwrap();
20
21 let who = match getsockname(&listen_sock).unwrap() {
22 SocketAddrAny::V4(addr) => addr,
23 _ => panic!(),
24 };
25
26 {
27 let (lock, cvar) = &*ready;
28 let mut port = lock.lock().unwrap();
29 *port = who.port();
30 cvar.notify_all();
31 }
32
33 let epoll = Epoll::new(epoll::CreateFlags::CLOEXEC, epoll::Owning::<OwnedFd>::new()).unwrap();
34
35 // Test into conversions.
36 let fd: OwnedFd = epoll.into();
37 let epoll: Epoll<epoll::Owning<OwnedFd>> = fd.into();
38 let fd: RawFd = epoll.into_raw_fd();
39 let epoll = unsafe { Epoll::<epoll::Owning<OwnedFd>>::from_raw_fd(fd) };
40
41 let raw_listen_sock = listen_sock.as_fd().as_raw_fd();
42 epoll.add(listen_sock, epoll::EventFlags::IN).unwrap();
43
44 let mut event_list = epoll::EventVec::with_capacity(4);
45 loop {
46 epoll.wait(&mut event_list, -1).unwrap();
47 for (_event_flags, target) in &event_list {
48 if target.as_raw_fd() == raw_listen_sock {
49 let conn_sock = accept(&*target).unwrap();
50 ioctl_fionbio(&conn_sock, true).unwrap();
51 epoll
52 .add(conn_sock, epoll::EventFlags::OUT | epoll::EventFlags::ET)
53 .unwrap();
54 } else {
55 write(&*target, b"hello\n").unwrap();
56 let _ = epoll.del(target).unwrap();
57 }
58 }
59 }
60 }
61
client(ready: Arc<(Mutex<u16>, Condvar)>)62 fn client(ready: Arc<(Mutex<u16>, Condvar)>) {
63 let port = {
64 let (lock, cvar) = &*ready;
65 let mut port = lock.lock().unwrap();
66 while *port == 0 {
67 port = cvar.wait(port).unwrap();
68 }
69 *port
70 };
71
72 let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port);
73 let mut buffer = vec![0; BUFFER_SIZE];
74
75 for _ in 0..16 {
76 let data_socket =
77 socket(AddressFamily::INET, SocketType::STREAM, Protocol::default()).unwrap();
78 connect_v4(&data_socket, &addr).unwrap();
79
80 let nread = read(&data_socket, &mut buffer).unwrap();
81 assert_eq!(String::from_utf8_lossy(&buffer[..nread]), "hello\n");
82 }
83 }
84
85 #[test]
test_epoll()86 fn test_epoll() {
87 let ready = Arc::new((Mutex::new(0_u16), Condvar::new()));
88 let ready_clone = Arc::clone(&ready);
89
90 let _server = thread::Builder::new()
91 .name("server".to_string())
92 .spawn(move || {
93 server(ready);
94 })
95 .unwrap();
96 let client = thread::Builder::new()
97 .name("client".to_string())
98 .spawn(move || {
99 client(ready_clone);
100 })
101 .unwrap();
102 client.join().unwrap();
103 }
104