1 use crate::{Interest, Token};
2
3 use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLRDHUP};
4 use log::error;
5 use std::os::unix::io::{AsRawFd, RawFd};
6 #[cfg(debug_assertions)]
7 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 use std::time::Duration;
9 use std::{cmp, i32, io, ptr};
10
11 /// Unique id for use as `SelectorId`.
12 #[cfg(debug_assertions)]
13 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
14
15 #[derive(Debug)]
16 pub struct Selector {
17 #[cfg(debug_assertions)]
18 id: usize,
19 ep: RawFd,
20 #[cfg(debug_assertions)]
21 has_waker: AtomicBool,
22 }
23
24 impl Selector {
new() -> io::Result<Selector>25 pub fn new() -> io::Result<Selector> {
26 #[cfg(not(target_os = "android"))]
27 let res = syscall!(epoll_create1(libc::EPOLL_CLOEXEC));
28
29 // On Android < API level 16 `epoll_create1` is not defined, so use a
30 // raw system call.
31 // According to libuv, `EPOLL_CLOEXEC` is not defined on Android API <
32 // 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform,
33 // so we use it instead.
34 #[cfg(target_os = "android")]
35 let res = syscall!(syscall(libc::SYS_epoll_create1, libc::O_CLOEXEC));
36
37 let ep = match res {
38 Ok(ep) => ep as RawFd,
39 Err(err) => {
40 // When `epoll_create1` is not available fall back to use
41 // `epoll_create` followed by `fcntl`.
42 if let Some(libc::ENOSYS) = err.raw_os_error() {
43 match syscall!(epoll_create(1024)) {
44 Ok(ep) => match syscall!(fcntl(ep, libc::F_SETFD, libc::FD_CLOEXEC)) {
45 Ok(ep) => ep as RawFd,
46 Err(err) => {
47 // `fcntl` failed, cleanup `ep`.
48 let _ = unsafe { libc::close(ep) };
49 return Err(err);
50 }
51 },
52 Err(err) => return Err(err),
53 }
54 } else {
55 return Err(err);
56 }
57 }
58 };
59
60 Ok(Selector {
61 #[cfg(debug_assertions)]
62 id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
63 ep,
64 #[cfg(debug_assertions)]
65 has_waker: AtomicBool::new(false),
66 })
67 }
68
try_clone(&self) -> io::Result<Selector>69 pub fn try_clone(&self) -> io::Result<Selector> {
70 syscall!(fcntl(self.ep, libc::F_DUPFD_CLOEXEC, super::LOWEST_FD)).map(|ep| Selector {
71 // It's the same selector, so we use the same id.
72 #[cfg(debug_assertions)]
73 id: self.id,
74 ep,
75 #[cfg(debug_assertions)]
76 has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
77 })
78 }
79
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>80 pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
81 // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ
82 // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits
83 // architectures. The magic number is the same constant used by libuv.
84 #[cfg(target_pointer_width = "32")]
85 const MAX_SAFE_TIMEOUT: u128 = 1789569;
86 #[cfg(not(target_pointer_width = "32"))]
87 const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128;
88
89 let timeout = timeout
90 .map(|to| {
91 // `Duration::as_millis` truncates, so round up. This avoids
92 // turning sub-millisecond timeouts into a zero timeout, unless
93 // the caller explicitly requests that by specifying a zero
94 // timeout.
95 let to_ms = (to + Duration::from_nanos(999_999)).as_millis();
96 cmp::min(MAX_SAFE_TIMEOUT, to_ms) as libc::c_int
97 })
98 .unwrap_or(-1);
99
100 events.clear();
101 syscall!(epoll_wait(
102 self.ep,
103 events.as_mut_ptr(),
104 events.capacity() as i32,
105 timeout,
106 ))
107 .map(|n_events| {
108 // This is safe because `epoll_wait` ensures that `n_events` are
109 // assigned.
110 unsafe { events.set_len(n_events as usize) };
111 })
112 }
113
register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>114 pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
115 let mut event = libc::epoll_event {
116 events: interests_to_epoll(interests),
117 u64: usize::from(token) as u64,
118 #[cfg(target_os = "redox")]
119 _pad: 0,
120 };
121
122 syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
123 }
124
reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>125 pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
126 let mut event = libc::epoll_event {
127 events: interests_to_epoll(interests),
128 u64: usize::from(token) as u64,
129 #[cfg(target_os = "redox")]
130 _pad: 0,
131 };
132
133 syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ())
134 }
135
deregister(&self, fd: RawFd) -> io::Result<()>136 pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
137 syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
138 }
139
140 #[cfg(debug_assertions)]
register_waker(&self) -> bool141 pub fn register_waker(&self) -> bool {
142 self.has_waker.swap(true, Ordering::AcqRel)
143 }
144 }
145
146 cfg_io_source! {
147 impl Selector {
148 #[cfg(debug_assertions)]
149 pub fn id(&self) -> usize {
150 self.id
151 }
152 }
153 }
154
155 impl AsRawFd for Selector {
as_raw_fd(&self) -> RawFd156 fn as_raw_fd(&self) -> RawFd {
157 self.ep
158 }
159 }
160
161 impl Drop for Selector {
drop(&mut self)162 fn drop(&mut self) {
163 if let Err(err) = syscall!(close(self.ep)) {
164 error!("error closing epoll: {}", err);
165 }
166 }
167 }
168
interests_to_epoll(interests: Interest) -> u32169 fn interests_to_epoll(interests: Interest) -> u32 {
170 let mut kind = EPOLLET;
171
172 if interests.is_readable() {
173 kind = kind | EPOLLIN | EPOLLRDHUP;
174 }
175
176 if interests.is_writable() {
177 kind |= EPOLLOUT;
178 }
179
180 if interests.is_priority() {
181 kind |= EPOLLPRI;
182 }
183
184 kind as u32
185 }
186
187 pub type Event = libc::epoll_event;
188 pub type Events = Vec<Event>;
189
190 pub mod event {
191 use std::fmt;
192
193 use crate::sys::Event;
194 use crate::Token;
195
token(event: &Event) -> Token196 pub fn token(event: &Event) -> Token {
197 Token(event.u64 as usize)
198 }
199
is_readable(event: &Event) -> bool200 pub fn is_readable(event: &Event) -> bool {
201 (event.events as libc::c_int & libc::EPOLLIN) != 0
202 || (event.events as libc::c_int & libc::EPOLLPRI) != 0
203 }
204
is_writable(event: &Event) -> bool205 pub fn is_writable(event: &Event) -> bool {
206 (event.events as libc::c_int & libc::EPOLLOUT) != 0
207 }
208
is_error(event: &Event) -> bool209 pub fn is_error(event: &Event) -> bool {
210 (event.events as libc::c_int & libc::EPOLLERR) != 0
211 }
212
is_read_closed(event: &Event) -> bool213 pub fn is_read_closed(event: &Event) -> bool {
214 // Both halves of the socket have closed
215 event.events as libc::c_int & libc::EPOLLHUP != 0
216 // Socket has received FIN or called shutdown(SHUT_RD)
217 || (event.events as libc::c_int & libc::EPOLLIN != 0
218 && event.events as libc::c_int & libc::EPOLLRDHUP != 0)
219 }
220
is_write_closed(event: &Event) -> bool221 pub fn is_write_closed(event: &Event) -> bool {
222 // Both halves of the socket have closed
223 event.events as libc::c_int & libc::EPOLLHUP != 0
224 // Unix pipe write end has closed
225 || (event.events as libc::c_int & libc::EPOLLOUT != 0
226 && event.events as libc::c_int & libc::EPOLLERR != 0)
227 // The other side (read end) of a Unix pipe has closed.
228 || event.events as libc::c_int == libc::EPOLLERR
229 }
230
is_priority(event: &Event) -> bool231 pub fn is_priority(event: &Event) -> bool {
232 (event.events as libc::c_int & libc::EPOLLPRI) != 0
233 }
234
is_aio(_: &Event) -> bool235 pub fn is_aio(_: &Event) -> bool {
236 // Not supported in the kernel, only in libc.
237 false
238 }
239
is_lio(_: &Event) -> bool240 pub fn is_lio(_: &Event) -> bool {
241 // Not supported.
242 false
243 }
244
debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result245 pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
246 #[allow(clippy::trivially_copy_pass_by_ref)]
247 fn check_events(got: &u32, want: &libc::c_int) -> bool {
248 (*got as libc::c_int & want) != 0
249 }
250 debug_detail!(
251 EventsDetails(u32),
252 check_events,
253 libc::EPOLLIN,
254 libc::EPOLLPRI,
255 libc::EPOLLOUT,
256 libc::EPOLLRDNORM,
257 libc::EPOLLRDBAND,
258 libc::EPOLLWRNORM,
259 libc::EPOLLWRBAND,
260 libc::EPOLLMSG,
261 libc::EPOLLERR,
262 libc::EPOLLHUP,
263 libc::EPOLLET,
264 libc::EPOLLRDHUP,
265 libc::EPOLLONESHOT,
266 #[cfg(target_os = "linux")]
267 libc::EPOLLEXCLUSIVE,
268 #[cfg(any(target_os = "android", target_os = "linux"))]
269 libc::EPOLLWAKEUP,
270 libc::EPOLL_CLOEXEC,
271 );
272
273 // Can't reference fields in packed structures.
274 let e_u64 = event.u64;
275 f.debug_struct("epoll_event")
276 .field("events", &EventsDetails(event.events))
277 .field("u64", &e_u64)
278 .finish()
279 }
280 }
281
282 #[cfg(target_os = "android")]
283 #[test]
assert_close_on_exec_flag()284 fn assert_close_on_exec_flag() {
285 // This assertion need to be true for Selector::new.
286 assert_eq!(libc::O_CLOEXEC, libc::EPOLL_CLOEXEC);
287 }
288