• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::{Interest, Token};
2 
3 use libc::{EPOLLET, EPOLLIN, EPOLLOUT, EPOLLPRI, EPOLLRDHUP};
4 use log::error;
5 use std::os::unix::io::{AsRawFd, RawFd};
6 #[cfg(debug_assertions)]
7 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
8 use std::time::Duration;
9 use std::{cmp, i32, io, ptr};
10 
11 /// Unique id for use as `SelectorId`.
12 #[cfg(debug_assertions)]
13 static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
14 
15 #[derive(Debug)]
16 pub struct Selector {
17     #[cfg(debug_assertions)]
18     id: usize,
19     ep: RawFd,
20     #[cfg(debug_assertions)]
21     has_waker: AtomicBool,
22 }
23 
24 impl Selector {
new() -> io::Result<Selector>25     pub fn new() -> io::Result<Selector> {
26         #[cfg(not(target_os = "android"))]
27         let res = syscall!(epoll_create1(libc::EPOLL_CLOEXEC));
28 
29         // On Android < API level 16 `epoll_create1` is not defined, so use a
30         // raw system call.
31         // According to libuv, `EPOLL_CLOEXEC` is not defined on Android API <
32         // 21. But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform,
33         // so we use it instead.
34         #[cfg(target_os = "android")]
35         let res = syscall!(syscall(libc::SYS_epoll_create1, libc::O_CLOEXEC));
36 
37         let ep = match res {
38             Ok(ep) => ep as RawFd,
39             Err(err) => {
40                 // When `epoll_create1` is not available fall back to use
41                 // `epoll_create` followed by `fcntl`.
42                 if let Some(libc::ENOSYS) = err.raw_os_error() {
43                     match syscall!(epoll_create(1024)) {
44                         Ok(ep) => match syscall!(fcntl(ep, libc::F_SETFD, libc::FD_CLOEXEC)) {
45                             Ok(ep) => ep as RawFd,
46                             Err(err) => {
47                                 // `fcntl` failed, cleanup `ep`.
48                                 let _ = unsafe { libc::close(ep) };
49                                 return Err(err);
50                             }
51                         },
52                         Err(err) => return Err(err),
53                     }
54                 } else {
55                     return Err(err);
56                 }
57             }
58         };
59 
60         Ok(Selector {
61             #[cfg(debug_assertions)]
62             id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
63             ep,
64             #[cfg(debug_assertions)]
65             has_waker: AtomicBool::new(false),
66         })
67     }
68 
try_clone(&self) -> io::Result<Selector>69     pub fn try_clone(&self) -> io::Result<Selector> {
70         syscall!(fcntl(self.ep, libc::F_DUPFD_CLOEXEC, super::LOWEST_FD)).map(|ep| Selector {
71             // It's the same selector, so we use the same id.
72             #[cfg(debug_assertions)]
73             id: self.id,
74             ep,
75             #[cfg(debug_assertions)]
76             has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
77         })
78     }
79 
select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()>80     pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
81         // A bug in kernels < 2.6.37 makes timeouts larger than LONG_MAX / CONFIG_HZ
82         // (approx. 30 minutes with CONFIG_HZ=1200) effectively infinite on 32 bits
83         // architectures. The magic number is the same constant used by libuv.
84         #[cfg(target_pointer_width = "32")]
85         const MAX_SAFE_TIMEOUT: u128 = 1789569;
86         #[cfg(not(target_pointer_width = "32"))]
87         const MAX_SAFE_TIMEOUT: u128 = libc::c_int::max_value() as u128;
88 
89         let timeout = timeout
90             .map(|to| {
91                 // `Duration::as_millis` truncates, so round up. This avoids
92                 // turning sub-millisecond timeouts into a zero timeout, unless
93                 // the caller explicitly requests that by specifying a zero
94                 // timeout.
95                 let to_ms = (to + Duration::from_nanos(999_999)).as_millis();
96                 cmp::min(MAX_SAFE_TIMEOUT, to_ms) as libc::c_int
97             })
98             .unwrap_or(-1);
99 
100         events.clear();
101         syscall!(epoll_wait(
102             self.ep,
103             events.as_mut_ptr(),
104             events.capacity() as i32,
105             timeout,
106         ))
107         .map(|n_events| {
108             // This is safe because `epoll_wait` ensures that `n_events` are
109             // assigned.
110             unsafe { events.set_len(n_events as usize) };
111         })
112     }
113 
register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>114     pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
115         let mut event = libc::epoll_event {
116             events: interests_to_epoll(interests),
117             u64: usize::from(token) as u64,
118             #[cfg(target_os = "redox")]
119             _pad: 0,
120         };
121 
122         syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_ADD, fd, &mut event)).map(|_| ())
123     }
124 
reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()>125     pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
126         let mut event = libc::epoll_event {
127             events: interests_to_epoll(interests),
128             u64: usize::from(token) as u64,
129             #[cfg(target_os = "redox")]
130             _pad: 0,
131         };
132 
133         syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_MOD, fd, &mut event)).map(|_| ())
134     }
135 
deregister(&self, fd: RawFd) -> io::Result<()>136     pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
137         syscall!(epoll_ctl(self.ep, libc::EPOLL_CTL_DEL, fd, ptr::null_mut())).map(|_| ())
138     }
139 
140     #[cfg(debug_assertions)]
register_waker(&self) -> bool141     pub fn register_waker(&self) -> bool {
142         self.has_waker.swap(true, Ordering::AcqRel)
143     }
144 }
145 
146 cfg_io_source! {
147     impl Selector {
148         #[cfg(debug_assertions)]
149         pub fn id(&self) -> usize {
150             self.id
151         }
152     }
153 }
154 
155 impl AsRawFd for Selector {
as_raw_fd(&self) -> RawFd156     fn as_raw_fd(&self) -> RawFd {
157         self.ep
158     }
159 }
160 
161 impl Drop for Selector {
drop(&mut self)162     fn drop(&mut self) {
163         if let Err(err) = syscall!(close(self.ep)) {
164             error!("error closing epoll: {}", err);
165         }
166     }
167 }
168 
interests_to_epoll(interests: Interest) -> u32169 fn interests_to_epoll(interests: Interest) -> u32 {
170     let mut kind = EPOLLET;
171 
172     if interests.is_readable() {
173         kind = kind | EPOLLIN | EPOLLRDHUP;
174     }
175 
176     if interests.is_writable() {
177         kind |= EPOLLOUT;
178     }
179 
180     if interests.is_priority() {
181         kind |= EPOLLPRI;
182     }
183 
184     kind as u32
185 }
186 
187 pub type Event = libc::epoll_event;
188 pub type Events = Vec<Event>;
189 
190 pub mod event {
191     use std::fmt;
192 
193     use crate::sys::Event;
194     use crate::Token;
195 
token(event: &Event) -> Token196     pub fn token(event: &Event) -> Token {
197         Token(event.u64 as usize)
198     }
199 
is_readable(event: &Event) -> bool200     pub fn is_readable(event: &Event) -> bool {
201         (event.events as libc::c_int & libc::EPOLLIN) != 0
202             || (event.events as libc::c_int & libc::EPOLLPRI) != 0
203     }
204 
is_writable(event: &Event) -> bool205     pub fn is_writable(event: &Event) -> bool {
206         (event.events as libc::c_int & libc::EPOLLOUT) != 0
207     }
208 
is_error(event: &Event) -> bool209     pub fn is_error(event: &Event) -> bool {
210         (event.events as libc::c_int & libc::EPOLLERR) != 0
211     }
212 
is_read_closed(event: &Event) -> bool213     pub fn is_read_closed(event: &Event) -> bool {
214         // Both halves of the socket have closed
215         event.events as libc::c_int & libc::EPOLLHUP != 0
216             // Socket has received FIN or called shutdown(SHUT_RD)
217             || (event.events as libc::c_int & libc::EPOLLIN != 0
218                 && event.events as libc::c_int & libc::EPOLLRDHUP != 0)
219     }
220 
is_write_closed(event: &Event) -> bool221     pub fn is_write_closed(event: &Event) -> bool {
222         // Both halves of the socket have closed
223         event.events as libc::c_int & libc::EPOLLHUP != 0
224             // Unix pipe write end has closed
225             || (event.events as libc::c_int & libc::EPOLLOUT != 0
226                 && event.events as libc::c_int & libc::EPOLLERR != 0)
227             // The other side (read end) of a Unix pipe has closed.
228             || event.events as libc::c_int == libc::EPOLLERR
229     }
230 
is_priority(event: &Event) -> bool231     pub fn is_priority(event: &Event) -> bool {
232         (event.events as libc::c_int & libc::EPOLLPRI) != 0
233     }
234 
is_aio(_: &Event) -> bool235     pub fn is_aio(_: &Event) -> bool {
236         // Not supported in the kernel, only in libc.
237         false
238     }
239 
is_lio(_: &Event) -> bool240     pub fn is_lio(_: &Event) -> bool {
241         // Not supported.
242         false
243     }
244 
debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result245     pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
246         #[allow(clippy::trivially_copy_pass_by_ref)]
247         fn check_events(got: &u32, want: &libc::c_int) -> bool {
248             (*got as libc::c_int & want) != 0
249         }
250         debug_detail!(
251             EventsDetails(u32),
252             check_events,
253             libc::EPOLLIN,
254             libc::EPOLLPRI,
255             libc::EPOLLOUT,
256             libc::EPOLLRDNORM,
257             libc::EPOLLRDBAND,
258             libc::EPOLLWRNORM,
259             libc::EPOLLWRBAND,
260             libc::EPOLLMSG,
261             libc::EPOLLERR,
262             libc::EPOLLHUP,
263             libc::EPOLLET,
264             libc::EPOLLRDHUP,
265             libc::EPOLLONESHOT,
266             #[cfg(target_os = "linux")]
267             libc::EPOLLEXCLUSIVE,
268             #[cfg(any(target_os = "android", target_os = "linux"))]
269             libc::EPOLLWAKEUP,
270             libc::EPOLL_CLOEXEC,
271         );
272 
273         // Can't reference fields in packed structures.
274         let e_u64 = event.u64;
275         f.debug_struct("epoll_event")
276             .field("events", &EventsDetails(event.events))
277             .field("u64", &e_u64)
278             .finish()
279     }
280 }
281 
282 #[cfg(target_os = "android")]
283 #[test]
assert_close_on_exec_flag()284 fn assert_close_on_exec_flag() {
285     // This assertion need to be true for Selector::new.
286     assert_eq!(libc::O_CLOEXEC, libc::EPOLL_CLOEXEC);
287 }
288