• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg(any(target_os = "android", target_os = "linux"))]
2 
3 use rustix::fd::{AsFd, OwnedFd};
4 use rustix::io::epoll::{self, Epoll};
5 use rustix::io::{ioctl_fionbio, read, write};
6 use rustix::net::{
7     accept, bind_v4, connect_v4, getsockname, listen, socket, AddressFamily, Ipv4Addr, Protocol,
8     SocketAddrAny, SocketAddrV4, SocketType,
9 };
10 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
11 use std::sync::{Arc, Condvar, Mutex};
12 use std::thread;
13 
14 const BUFFER_SIZE: usize = 20;
15 
server(ready: Arc<(Mutex<u16>, Condvar)>)16 fn server(ready: Arc<(Mutex<u16>, Condvar)>) {
17     let listen_sock = socket(AddressFamily::INET, SocketType::STREAM, Protocol::default()).unwrap();
18     bind_v4(&listen_sock, &SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)).unwrap();
19     listen(&listen_sock, 1).unwrap();
20 
21     let who = match getsockname(&listen_sock).unwrap() {
22         SocketAddrAny::V4(addr) => addr,
23         _ => panic!(),
24     };
25 
26     {
27         let (lock, cvar) = &*ready;
28         let mut port = lock.lock().unwrap();
29         *port = who.port();
30         cvar.notify_all();
31     }
32 
33     let epoll = Epoll::new(epoll::CreateFlags::CLOEXEC, epoll::Owning::<OwnedFd>::new()).unwrap();
34 
35     // Test into conversions.
36     let fd: OwnedFd = epoll.into();
37     let epoll: Epoll<epoll::Owning<OwnedFd>> = fd.into();
38     let fd: RawFd = epoll.into_raw_fd();
39     let epoll = unsafe { Epoll::<epoll::Owning<OwnedFd>>::from_raw_fd(fd) };
40 
41     let raw_listen_sock = listen_sock.as_fd().as_raw_fd();
42     epoll.add(listen_sock, epoll::EventFlags::IN).unwrap();
43 
44     let mut event_list = epoll::EventVec::with_capacity(4);
45     loop {
46         epoll.wait(&mut event_list, -1).unwrap();
47         for (_event_flags, target) in &event_list {
48             if target.as_raw_fd() == raw_listen_sock {
49                 let conn_sock = accept(&*target).unwrap();
50                 ioctl_fionbio(&conn_sock, true).unwrap();
51                 epoll
52                     .add(conn_sock, epoll::EventFlags::OUT | epoll::EventFlags::ET)
53                     .unwrap();
54             } else {
55                 write(&*target, b"hello\n").unwrap();
56                 let _ = epoll.del(target).unwrap();
57             }
58         }
59     }
60 }
61 
client(ready: Arc<(Mutex<u16>, Condvar)>)62 fn client(ready: Arc<(Mutex<u16>, Condvar)>) {
63     let port = {
64         let (lock, cvar) = &*ready;
65         let mut port = lock.lock().unwrap();
66         while *port == 0 {
67             port = cvar.wait(port).unwrap();
68         }
69         *port
70     };
71 
72     let addr = SocketAddrV4::new(Ipv4Addr::LOCALHOST, port);
73     let mut buffer = vec![0; BUFFER_SIZE];
74 
75     for _ in 0..16 {
76         let data_socket =
77             socket(AddressFamily::INET, SocketType::STREAM, Protocol::default()).unwrap();
78         connect_v4(&data_socket, &addr).unwrap();
79 
80         let nread = read(&data_socket, &mut buffer).unwrap();
81         assert_eq!(String::from_utf8_lossy(&buffer[..nread]), "hello\n");
82     }
83 }
84 
85 #[test]
test_epoll()86 fn test_epoll() {
87     let ready = Arc::new((Mutex::new(0_u16), Condvar::new()));
88     let ready_clone = Arc::clone(&ready);
89 
90     let _server = thread::Builder::new()
91         .name("server".to_string())
92         .spawn(move || {
93             server(ready);
94         })
95         .unwrap();
96     let client = thread::Builder::new()
97         .name("client".to_string())
98         .spawn(move || {
99             client(ready_clone);
100         })
101         .unwrap();
102     client.join().unwrap();
103 }
104