1 // Copyright 2017 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::cell::{Cell, Ref, RefCell}; 6 use std::cmp::min; 7 use std::fs::File; 8 use std::i32; 9 use std::i64; 10 use std::marker::PhantomData; 11 use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; 12 use std::ptr::null_mut; 13 use std::slice; 14 use std::thread; 15 use std::time::Duration; 16 17 use libc::{ 18 c_int, epoll_create1, epoll_ctl, epoll_event, epoll_wait, EPOLLHUP, EPOLLIN, EPOLLOUT, 19 EPOLL_CLOEXEC, EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD, 20 }; 21 22 use crate::{errno_result, Result}; 23 24 const POLL_CONTEXT_MAX_EVENTS: usize = 16; 25 26 /// EpollEvents wraps raw epoll_events, it should only be used with EpollContext. 27 pub struct EpollEvents(RefCell<[epoll_event; POLL_CONTEXT_MAX_EVENTS]>); 28 29 impl EpollEvents { new() -> EpollEvents30 pub fn new() -> EpollEvents { 31 EpollEvents(RefCell::new( 32 [epoll_event { events: 0, u64: 0 }; POLL_CONTEXT_MAX_EVENTS], 33 )) 34 } 35 } 36 37 impl Default for EpollEvents { default() -> EpollEvents38 fn default() -> EpollEvents { 39 Self::new() 40 } 41 } 42 43 /// Trait for a token that can be associated with an `fd` in a `PollContext`. 44 /// 45 /// Simple enums that have no or primitive variant data data can use the `#[derive(PollToken)]` 46 /// custom derive to implement this trait. See 47 /// [poll_token_derive::poll_token](../poll_token_derive/fn.poll_token.html) for details. 48 pub trait PollToken { 49 /// Converts this token into a u64 that can be turned back into a token via `from_raw_token`. as_raw_token(&self) -> u6450 fn as_raw_token(&self) -> u64; 51 52 /// Converts a raw token as returned from `as_raw_token` back into a token. 53 /// 54 /// It is invalid to give a raw token that was not returned via `as_raw_token` from the same 55 /// `Self`. The implementation can expect that this will never happen as a result of its usage 56 /// in `PollContext`. from_raw_token(data: u64) -> Self57 fn from_raw_token(data: u64) -> Self; 58 } 59 60 impl PollToken for usize { as_raw_token(&self) -> u6461 fn as_raw_token(&self) -> u64 { 62 *self as u64 63 } 64 from_raw_token(data: u64) -> Self65 fn from_raw_token(data: u64) -> Self { 66 data as Self 67 } 68 } 69 70 impl PollToken for u64 { as_raw_token(&self) -> u6471 fn as_raw_token(&self) -> u64 { 72 *self as u64 73 } 74 from_raw_token(data: u64) -> Self75 fn from_raw_token(data: u64) -> Self { 76 data as Self 77 } 78 } 79 80 impl PollToken for u32 { as_raw_token(&self) -> u6481 fn as_raw_token(&self) -> u64 { 82 u64::from(*self) 83 } 84 from_raw_token(data: u64) -> Self85 fn from_raw_token(data: u64) -> Self { 86 data as Self 87 } 88 } 89 90 impl PollToken for u16 { as_raw_token(&self) -> u6491 fn as_raw_token(&self) -> u64 { 92 u64::from(*self) 93 } 94 from_raw_token(data: u64) -> Self95 fn from_raw_token(data: u64) -> Self { 96 data as Self 97 } 98 } 99 100 impl PollToken for u8 { as_raw_token(&self) -> u64101 fn as_raw_token(&self) -> u64 { 102 u64::from(*self) 103 } 104 from_raw_token(data: u64) -> Self105 fn from_raw_token(data: u64) -> Self { 106 data as Self 107 } 108 } 109 110 impl PollToken for () { as_raw_token(&self) -> u64111 fn as_raw_token(&self) -> u64 { 112 0 113 } 114 from_raw_token(_data: u64) -> Self115 fn from_raw_token(_data: u64) -> Self {} 116 } 117 118 /// An event returned by `PollContext::wait`. 119 pub struct PollEvent<'a, T> { 120 event: &'a epoll_event, 121 token: PhantomData<T>, // Needed to satisfy usage of T 122 } 123 124 impl<'a, T: PollToken> PollEvent<'a, T> { 125 /// Gets the token associated in `PollContext::add` with this event. token(&self) -> T126 pub fn token(&self) -> T { 127 T::from_raw_token(self.event.u64) 128 } 129 130 /// True if the `fd` associated with this token in `PollContext::add` is readable. readable(&self) -> bool131 pub fn readable(&self) -> bool { 132 self.event.events & (EPOLLIN as u32) != 0 133 } 134 135 /// True if the `fd` associated with this token in `PollContext::add` is writable. writable(&self) -> bool136 pub fn writable(&self) -> bool { 137 self.event.events & (EPOLLOUT as u32) != 0 138 } 139 140 /// True if the `fd` associated with this token in `PollContext::add` has been hungup on. hungup(&self) -> bool141 pub fn hungup(&self) -> bool { 142 self.event.events & (EPOLLHUP as u32) != 0 143 } 144 } 145 146 /// An iterator over some (sub)set of events returned by `PollContext::wait`. 147 pub struct PollEventIter<'a, I, T> 148 where 149 I: Iterator<Item = &'a epoll_event>, 150 { 151 mask: u32, 152 iter: I, 153 tokens: PhantomData<[T]>, // Needed to satisfy usage of T 154 } 155 156 impl<'a, I, T> Iterator for PollEventIter<'a, I, T> 157 where 158 I: Iterator<Item = &'a epoll_event>, 159 T: PollToken, 160 { 161 type Item = PollEvent<'a, T>; next(&mut self) -> Option<Self::Item>162 fn next(&mut self) -> Option<Self::Item> { 163 let mask = self.mask; 164 self.iter 165 .find(|event| (event.events & mask) != 0) 166 .map(|event| PollEvent { 167 event, 168 token: PhantomData, 169 }) 170 } 171 } 172 173 /// The list of event returned by `PollContext::wait`. 174 pub struct PollEvents<'a, T> { 175 count: usize, 176 events: Ref<'a, [epoll_event; POLL_CONTEXT_MAX_EVENTS]>, 177 tokens: PhantomData<[T]>, // Needed to satisfy usage of T 178 } 179 180 impl<'a, T: PollToken> PollEvents<'a, T> { 181 /// Copies the events to an owned structure so the reference to this (and by extension 182 /// `PollContext`) can be dropped. to_owned(&self) -> PollEventsOwned<T>183 pub fn to_owned(&self) -> PollEventsOwned<T> { 184 PollEventsOwned { 185 count: self.count, 186 events: RefCell::new(*self.events), 187 tokens: PhantomData, 188 } 189 } 190 191 /// Iterates over each event. iter(&self) -> PollEventIter<slice::Iter<epoll_event>, T>192 pub fn iter(&self) -> PollEventIter<slice::Iter<epoll_event>, T> { 193 PollEventIter { 194 mask: 0xffff_ffff, 195 iter: self.events[..self.count].iter(), 196 tokens: PhantomData, 197 } 198 } 199 200 /// Iterates over each readable event. iter_readable(&self) -> PollEventIter<slice::Iter<epoll_event>, T>201 pub fn iter_readable(&self) -> PollEventIter<slice::Iter<epoll_event>, T> { 202 PollEventIter { 203 mask: EPOLLIN as u32, 204 iter: self.events[..self.count].iter(), 205 tokens: PhantomData, 206 } 207 } 208 209 /// Iterates over each writable event. iter_writable(&self) -> PollEventIter<slice::Iter<epoll_event>, T>210 pub fn iter_writable(&self) -> PollEventIter<slice::Iter<epoll_event>, T> { 211 PollEventIter { 212 mask: EPOLLOUT as u32, 213 iter: self.events[..self.count].iter(), 214 tokens: PhantomData, 215 } 216 } 217 218 /// Iterates over each hungup event. iter_hungup(&self) -> PollEventIter<slice::Iter<epoll_event>, T>219 pub fn iter_hungup(&self) -> PollEventIter<slice::Iter<epoll_event>, T> { 220 PollEventIter { 221 mask: EPOLLHUP as u32, 222 iter: self.events[..self.count].iter(), 223 tokens: PhantomData, 224 } 225 } 226 } 227 228 impl<'a, T: PollToken> IntoIterator for &'a PollEvents<'_, T> { 229 type Item = PollEvent<'a, T>; 230 type IntoIter = PollEventIter<'a, slice::Iter<'a, epoll_event>, T>; 231 into_iter(self) -> Self::IntoIter232 fn into_iter(self) -> Self::IntoIter { 233 self.iter() 234 } 235 } 236 237 /// A deep copy of the event records from `PollEvents`. 238 pub struct PollEventsOwned<T> { 239 count: usize, 240 events: RefCell<[epoll_event; POLL_CONTEXT_MAX_EVENTS]>, 241 tokens: PhantomData<T>, // Needed to satisfy usage of T 242 } 243 244 impl<T: PollToken> PollEventsOwned<T> { 245 /// Takes a reference to the events so that they can be iterated via methods in `PollEvents`. as_ref(&self) -> PollEvents<T>246 pub fn as_ref(&self) -> PollEvents<T> { 247 PollEvents { 248 count: self.count, 249 events: self.events.borrow(), 250 tokens: PhantomData, 251 } 252 } 253 } 254 255 /// Watching events taken by PollContext. 256 pub struct WatchingEvents(u32); 257 258 impl WatchingEvents { 259 /// Returns empty Events. 260 #[inline(always)] empty() -> WatchingEvents261 pub fn empty() -> WatchingEvents { 262 WatchingEvents(0) 263 } 264 265 /// Build Events from raw epoll events (defined in epoll_ctl(2)). 266 #[inline(always)] new(raw: u32) -> WatchingEvents267 pub fn new(raw: u32) -> WatchingEvents { 268 WatchingEvents(raw) 269 } 270 271 /// Set read events. 272 #[inline(always)] set_read(self) -> WatchingEvents273 pub fn set_read(self) -> WatchingEvents { 274 WatchingEvents(self.0 | EPOLLIN as u32) 275 } 276 277 /// Set write events. 278 #[inline(always)] set_write(self) -> WatchingEvents279 pub fn set_write(self) -> WatchingEvents { 280 WatchingEvents(self.0 | EPOLLOUT as u32) 281 } 282 283 /// Get the underlying epoll events. get_raw(&self) -> u32284 pub fn get_raw(&self) -> u32 { 285 self.0 286 } 287 } 288 289 /// EpollContext wraps linux epoll. It provides similar interface to PollContext. 290 /// It is thread safe while PollContext is not. It requires user to pass in a reference of 291 /// EpollEvents while PollContext does not. Always use PollContext if you don't need to access the 292 /// same epoll from different threads. 293 pub struct EpollContext<T> { 294 epoll_ctx: File, 295 // Needed to satisfy usage of T 296 tokens: PhantomData<[T]>, 297 } 298 299 impl<T: PollToken> EpollContext<T> { 300 /// Creates a new `EpollContext`. new() -> Result<EpollContext<T>>301 pub fn new() -> Result<EpollContext<T>> { 302 // Safe because we check the return value. 303 let epoll_fd = unsafe { epoll_create1(EPOLL_CLOEXEC) }; 304 if epoll_fd < 0 { 305 return errno_result(); 306 } 307 Ok(EpollContext { 308 epoll_ctx: unsafe { File::from_raw_fd(epoll_fd) }, 309 tokens: PhantomData, 310 }) 311 } 312 313 /// Creates a new `EpollContext` and adds the slice of `fd` and `token` tuples to the new 314 /// context. 315 /// 316 /// This is equivalent to calling `new` followed by `add_many`. If there is an error, this will 317 /// return the error instead of the new context. build_with(fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<EpollContext<T>>318 pub fn build_with(fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<EpollContext<T>> { 319 let ctx = EpollContext::new()?; 320 ctx.add_many(fd_tokens)?; 321 Ok(ctx) 322 } 323 324 /// Adds the given slice of `fd` and `token` tuples to this context. 325 /// 326 /// This is equivalent to calling `add` with each `fd` and `token`. If there are any errors, 327 /// this method will stop adding `fd`s and return the first error, leaving this context in a 328 /// undefined state. add_many(&self, fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<()>329 pub fn add_many(&self, fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<()> { 330 for (fd, token) in fd_tokens { 331 self.add(*fd, T::from_raw_token(token.as_raw_token()))?; 332 } 333 Ok(()) 334 } 335 336 /// Adds the given `fd` to this context and associates the given `token` with the `fd`'s 337 /// readable events. 338 /// 339 /// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and 340 /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different 341 /// FD number) added to this context, events will not be reported by `wait` anymore. add(&self, fd: &dyn AsRawFd, token: T) -> Result<()>342 pub fn add(&self, fd: &dyn AsRawFd, token: T) -> Result<()> { 343 self.add_fd_with_events(fd, WatchingEvents::empty().set_read(), token) 344 } 345 346 /// Adds the given `fd` to this context, watching for the specified events and associates the 347 /// given 'token' with those events. 348 /// 349 /// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and 350 /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different 351 /// FD number) added to this context, events will not be reported by `wait` anymore. add_fd_with_events( &self, fd: &dyn AsRawFd, events: WatchingEvents, token: T, ) -> Result<()>352 pub fn add_fd_with_events( 353 &self, 354 fd: &dyn AsRawFd, 355 events: WatchingEvents, 356 token: T, 357 ) -> Result<()> { 358 let mut evt = epoll_event { 359 events: events.get_raw(), 360 u64: token.as_raw_token(), 361 }; 362 // Safe because we give a valid epoll FD and FD to watch, as well as a valid epoll_event 363 // structure. Then we check the return value. 364 let ret = unsafe { 365 epoll_ctl( 366 self.epoll_ctx.as_raw_fd(), 367 EPOLL_CTL_ADD, 368 fd.as_raw_fd(), 369 &mut evt, 370 ) 371 }; 372 if ret < 0 { 373 return errno_result(); 374 }; 375 Ok(()) 376 } 377 378 /// If `fd` was previously added to this context, the watched events will be replaced with 379 /// `events` and the token associated with it will be replaced with the given `token`. modify(&self, fd: &dyn AsRawFd, events: WatchingEvents, token: T) -> Result<()>380 pub fn modify(&self, fd: &dyn AsRawFd, events: WatchingEvents, token: T) -> Result<()> { 381 let mut evt = epoll_event { 382 events: events.0, 383 u64: token.as_raw_token(), 384 }; 385 // Safe because we give a valid epoll FD and FD to modify, as well as a valid epoll_event 386 // structure. Then we check the return value. 387 let ret = unsafe { 388 epoll_ctl( 389 self.epoll_ctx.as_raw_fd(), 390 EPOLL_CTL_MOD, 391 fd.as_raw_fd(), 392 &mut evt, 393 ) 394 }; 395 if ret < 0 { 396 return errno_result(); 397 }; 398 Ok(()) 399 } 400 401 /// Deletes the given `fd` from this context. 402 /// 403 /// If an `fd`'s token shows up in the list of hangup events, it should be removed using this 404 /// method or by closing/dropping (if and only if the fd was never dup()'d/fork()'d) the `fd`. 405 /// Failure to do so will cause the `wait` method to always return immediately, causing ~100% 406 /// CPU load. delete(&self, fd: &dyn AsRawFd) -> Result<()>407 pub fn delete(&self, fd: &dyn AsRawFd) -> Result<()> { 408 // Safe because we give a valid epoll FD and FD to stop watching. Then we check the return 409 // value. 410 let ret = unsafe { 411 epoll_ctl( 412 self.epoll_ctx.as_raw_fd(), 413 EPOLL_CTL_DEL, 414 fd.as_raw_fd(), 415 null_mut(), 416 ) 417 }; 418 if ret < 0 { 419 return errno_result(); 420 }; 421 Ok(()) 422 } 423 424 /// Waits for any events to occur in FDs that were previously added to this context. 425 /// 426 /// The events are level-triggered, meaning that if any events are unhandled (i.e. not reading 427 /// for readable events and not closing for hungup events), subsequent calls to `wait` will 428 /// return immediately. The consequence of not handling an event perpetually while calling 429 /// `wait` is that the callers loop will degenerated to busy loop polling, pinning a CPU to 430 /// ~100% usage. wait<'a>(&self, events: &'a EpollEvents) -> Result<PollEvents<'a, T>>431 pub fn wait<'a>(&self, events: &'a EpollEvents) -> Result<PollEvents<'a, T>> { 432 self.wait_timeout(events, Duration::new(i64::MAX as u64, 0)) 433 } 434 435 /// Like `wait` except will only block for a maximum of the given `timeout`. 436 /// 437 /// This may return earlier than `timeout` with zero events if the duration indicated exceeds 438 /// system limits. wait_timeout<'a>( &self, events: &'a EpollEvents, timeout: Duration, ) -> Result<PollEvents<'a, T>>439 pub fn wait_timeout<'a>( 440 &self, 441 events: &'a EpollEvents, 442 timeout: Duration, 443 ) -> Result<PollEvents<'a, T>> { 444 let timeout_millis = if timeout.as_secs() as i64 == i64::max_value() { 445 // We make the convenient assumption that 2^63 seconds is an effectively unbounded time 446 // frame. This is meant to mesh with `wait` calling us with no timeout. 447 -1 448 } else { 449 // In cases where we the number of milliseconds would overflow an i32, we substitute the 450 // maximum timeout which is ~24.8 days. 451 let millis = timeout 452 .as_secs() 453 .checked_mul(1_000) 454 .and_then(|ms| ms.checked_add(u64::from(timeout.subsec_nanos()) / 1_000_000)) 455 .unwrap_or(i32::max_value() as u64); 456 min(i32::max_value() as u64, millis) as i32 457 }; 458 let ret = { 459 let mut epoll_events = events.0.borrow_mut(); 460 let max_events = epoll_events.len() as c_int; 461 // Safe because we give an epoll context and a properly sized epoll_events array 462 // pointer, which we trust the kernel to fill in properly. 463 unsafe { 464 handle_eintr_errno!(epoll_wait( 465 self.epoll_ctx.as_raw_fd(), 466 &mut epoll_events[0], 467 max_events, 468 timeout_millis 469 )) 470 } 471 }; 472 if ret < 0 { 473 return errno_result(); 474 } 475 let epoll_events = events.0.borrow(); 476 let events = PollEvents { 477 count: ret as usize, 478 events: epoll_events, 479 tokens: PhantomData, 480 }; 481 Ok(events) 482 } 483 } 484 485 impl<T: PollToken> AsRawFd for EpollContext<T> { as_raw_fd(&self) -> RawFd486 fn as_raw_fd(&self) -> RawFd { 487 self.epoll_ctx.as_raw_fd() 488 } 489 } 490 491 impl<T: PollToken> IntoRawFd for EpollContext<T> { into_raw_fd(self) -> RawFd492 fn into_raw_fd(self) -> RawFd { 493 self.epoll_ctx.into_raw_fd() 494 } 495 } 496 497 /// Used to poll multiple objects that have file descriptors. 498 /// 499 /// # Example 500 /// 501 /// ``` 502 /// # use sys_util::{Result, EventFd, PollContext, PollEvents}; 503 /// # fn test() -> Result<()> { 504 /// let evt1 = EventFd::new()?; 505 /// let evt2 = EventFd::new()?; 506 /// evt2.write(1)?; 507 /// 508 /// let ctx: PollContext<u32> = PollContext::new()?; 509 /// ctx.add(&evt1, 1)?; 510 /// ctx.add(&evt2, 2)?; 511 /// 512 /// let pollevents: PollEvents<u32> = ctx.wait()?; 513 /// let tokens: Vec<u32> = pollevents.iter_readable().map(|e| e.token()).collect(); 514 /// assert_eq!(&tokens[..], &[2]); 515 /// # Ok(()) 516 /// # } 517 /// ``` 518 pub struct PollContext<T> { 519 epoll_ctx: EpollContext<T>, 520 521 // We use a RefCell here so that the `wait` method only requires an immutable self reference 522 // while returning the events (encapsulated by PollEvents). Without the RefCell, `wait` would 523 // hold a mutable reference that lives as long as its returned reference (i.e. the PollEvents), 524 // even though that reference is immutable. This is terribly inconvenient for the caller because 525 // the borrow checking would prevent them from using `delete` and `add` while the events are in 526 // scope. 527 events: EpollEvents, 528 529 // Hangup busy loop detection variables. See `check_for_hungup_busy_loop`. 530 hangups: Cell<usize>, 531 max_hangups: Cell<usize>, 532 } 533 534 impl<T: PollToken> PollContext<T> { 535 /// Creates a new `PollContext`. new() -> Result<PollContext<T>>536 pub fn new() -> Result<PollContext<T>> { 537 Ok(PollContext { 538 epoll_ctx: EpollContext::new()?, 539 events: EpollEvents::new(), 540 hangups: Cell::new(0), 541 max_hangups: Cell::new(0), 542 }) 543 } 544 545 /// Creates a new `PollContext` and adds the slice of `fd` and `token` tuples to the new 546 /// context. 547 /// 548 /// This is equivalent to calling `new` followed by `add_many`. If there is an error, this will 549 /// return the error instead of the new context. build_with(fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<PollContext<T>>550 pub fn build_with(fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<PollContext<T>> { 551 let ctx = PollContext::new()?; 552 ctx.add_many(fd_tokens)?; 553 Ok(ctx) 554 } 555 556 /// Adds the given slice of `fd` and `token` tuples to this context. 557 /// 558 /// This is equivalent to calling `add` with each `fd` and `token`. If there are any errors, 559 /// this method will stop adding `fd`s and return the first error, leaving this context in a 560 /// undefined state. add_many(&self, fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<()>561 pub fn add_many(&self, fd_tokens: &[(&dyn AsRawFd, T)]) -> Result<()> { 562 for (fd, token) in fd_tokens { 563 self.add(*fd, T::from_raw_token(token.as_raw_token()))?; 564 } 565 Ok(()) 566 } 567 568 /// Adds the given `fd` to this context and associates the given `token` with the `fd`'s 569 /// readable events. 570 /// 571 /// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and 572 /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different 573 /// FD number) added to this context, events will not be reported by `wait` anymore. add(&self, fd: &dyn AsRawFd, token: T) -> Result<()>574 pub fn add(&self, fd: &dyn AsRawFd, token: T) -> Result<()> { 575 self.add_fd_with_events(fd, WatchingEvents::empty().set_read(), token) 576 } 577 578 /// Adds the given `fd` to this context, watching for the specified events and associates the 579 /// given 'token' with those events. 580 /// 581 /// A `fd` can only be added once and does not need to be kept open. If the `fd` is dropped and 582 /// there were no duplicated file descriptors (i.e. adding the same descriptor with a different 583 /// FD number) added to this context, events will not be reported by `wait` anymore. add_fd_with_events( &self, fd: &dyn AsRawFd, events: WatchingEvents, token: T, ) -> Result<()>584 pub fn add_fd_with_events( 585 &self, 586 fd: &dyn AsRawFd, 587 events: WatchingEvents, 588 token: T, 589 ) -> Result<()> { 590 self.epoll_ctx.add_fd_with_events(fd, events, token)?; 591 self.hangups.set(0); 592 self.max_hangups.set(self.max_hangups.get() + 1); 593 Ok(()) 594 } 595 596 /// If `fd` was previously added to this context, the watched events will be replaced with 597 /// `events` and the token associated with it will be replaced with the given `token`. modify(&self, fd: &dyn AsRawFd, events: WatchingEvents, token: T) -> Result<()>598 pub fn modify(&self, fd: &dyn AsRawFd, events: WatchingEvents, token: T) -> Result<()> { 599 self.epoll_ctx.modify(fd, events, token) 600 } 601 602 /// Deletes the given `fd` from this context. 603 /// 604 /// If an `fd`'s token shows up in the list of hangup events, it should be removed using this 605 /// method or by closing/dropping (if and only if the fd was never dup()'d/fork()'d) the `fd`. 606 /// Failure to do so will cause the `wait` method to always return immediately, causing ~100% 607 /// CPU load. delete(&self, fd: &dyn AsRawFd) -> Result<()>608 pub fn delete(&self, fd: &dyn AsRawFd) -> Result<()> { 609 self.epoll_ctx.delete(fd)?; 610 self.hangups.set(0); 611 self.max_hangups.set(self.max_hangups.get() - 1); 612 Ok(()) 613 } 614 615 // This method determines if the the user of wait is misusing the `PollContext` by leaving FDs 616 // in this `PollContext` that have been shutdown or hungup on. Such an FD will cause `wait` to 617 // return instantly with a hungup event. If that FD is perpetually left in this context, a busy 618 // loop burning ~100% of one CPU will silently occur with no human visible malfunction. 619 // 620 // How do we know if the client of this context is ignoring hangups? A naive implementation 621 // would trigger if consecutive wait calls yield hangup events, but there are legitimate cases 622 // for this, such as two distinct sockets becoming hungup across two consecutive wait calls. A 623 // smarter implementation would only trigger if `delete` wasn't called between waits that 624 // yielded hangups. Sadly `delete` isn't the only way to remove an FD from this context. The 625 // other way is for the client to close the hungup FD, which automatically removes it from this 626 // context. Assuming that the client always uses close, this implementation would too eagerly 627 // trigger. 628 // 629 // The implementation used here keeps an upper bound of FDs in this context using a counter 630 // hooked into add/delete (which is imprecise because close can also remove FDs without us 631 // knowing). The number of consecutive (no add or delete in between) hangups yielded by wait 632 // calls is counted and compared to the upper bound. If the upper bound is exceeded by the 633 // consecutive hangups, the implementation triggers the check and logs. 634 // 635 // This implementation has false negatives because the upper bound can be completely too high, 636 // in the worst case caused by only using close instead of delete. However, this method has the 637 // advantage of always triggering eventually genuine busy loop cases, requires no dynamic 638 // allocations, is fast and constant time to compute, and has no false positives. check_for_hungup_busy_loop(&self, new_hangups: usize)639 fn check_for_hungup_busy_loop(&self, new_hangups: usize) { 640 let old_hangups = self.hangups.get(); 641 let max_hangups = self.max_hangups.get(); 642 if old_hangups <= max_hangups && old_hangups + new_hangups > max_hangups { 643 warn!( 644 "busy poll wait loop with hungup FDs detected on thread {}", 645 thread::current().name().unwrap_or("") 646 ); 647 // This panic is helpful for tests of this functionality. 648 #[cfg(test)] 649 panic!("hungup busy loop detected"); 650 } 651 self.hangups.set(old_hangups + new_hangups); 652 } 653 654 /// Waits for any events to occur in FDs that were previously added to this context. 655 /// 656 /// The events are level-triggered, meaning that if any events are unhandled (i.e. not reading 657 /// for readable events and not closing for hungup events), subsequent calls to `wait` will 658 /// return immediately. The consequence of not handling an event perpetually while calling 659 /// `wait` is that the callers loop will degenerated to busy loop polling, pinning a CPU to 660 /// ~100% usage. 661 /// 662 /// # Panics 663 /// Panics if the returned `PollEvents` structure is not dropped before subsequent `wait` calls. wait(&self) -> Result<PollEvents<T>>664 pub fn wait(&self) -> Result<PollEvents<T>> { 665 self.wait_timeout(Duration::new(i64::MAX as u64, 0)) 666 } 667 668 /// Like `wait` except will only block for a maximum of the given `timeout`. 669 /// 670 /// This may return earlier than `timeout` with zero events if the duration indicated exceeds 671 /// system limits. wait_timeout(&self, timeout: Duration) -> Result<PollEvents<T>>672 pub fn wait_timeout(&self, timeout: Duration) -> Result<PollEvents<T>> { 673 let events = self.epoll_ctx.wait_timeout(&self.events, timeout)?; 674 let hangups = events.iter_hungup().count(); 675 self.check_for_hungup_busy_loop(hangups); 676 Ok(events) 677 } 678 } 679 680 impl<T: PollToken> AsRawFd for PollContext<T> { as_raw_fd(&self) -> RawFd681 fn as_raw_fd(&self) -> RawFd { 682 self.epoll_ctx.as_raw_fd() 683 } 684 } 685 686 impl<T: PollToken> IntoRawFd for PollContext<T> { into_raw_fd(self) -> RawFd687 fn into_raw_fd(self) -> RawFd { 688 self.epoll_ctx.into_raw_fd() 689 } 690 } 691 692 #[cfg(test)] 693 mod tests { 694 use super::*; 695 use crate::EventFd; 696 use poll_token_derive::PollToken; 697 use std::os::unix::net::UnixStream; 698 use std::time::Instant; 699 700 #[test] poll_context()701 fn poll_context() { 702 let evt1 = EventFd::new().unwrap(); 703 let evt2 = EventFd::new().unwrap(); 704 evt1.write(1).unwrap(); 705 evt2.write(1).unwrap(); 706 let ctx: PollContext<u32> = PollContext::build_with(&[(&evt1, 1), (&evt2, 2)]).unwrap(); 707 708 let mut evt_count = 0; 709 while evt_count < 2 { 710 for event in ctx.wait().unwrap().iter_readable() { 711 evt_count += 1; 712 match event.token() { 713 1 => { 714 evt1.read().unwrap(); 715 ctx.delete(&evt1).unwrap(); 716 } 717 2 => { 718 evt2.read().unwrap(); 719 ctx.delete(&evt2).unwrap(); 720 } 721 _ => panic!("unexpected token"), 722 }; 723 } 724 } 725 assert_eq!(evt_count, 2); 726 } 727 728 #[test] poll_context_overflow()729 fn poll_context_overflow() { 730 const EVT_COUNT: usize = POLL_CONTEXT_MAX_EVENTS * 2 + 1; 731 let ctx: PollContext<usize> = PollContext::new().unwrap(); 732 let mut evts = Vec::with_capacity(EVT_COUNT); 733 for i in 0..EVT_COUNT { 734 let evt = EventFd::new().unwrap(); 735 evt.write(1).unwrap(); 736 ctx.add(&evt, i).unwrap(); 737 evts.push(evt); 738 } 739 let mut evt_count = 0; 740 while evt_count < EVT_COUNT { 741 for event in ctx.wait().unwrap().iter_readable() { 742 evts[event.token()].read().unwrap(); 743 evt_count += 1; 744 } 745 } 746 } 747 748 #[test] 749 #[should_panic] poll_context_hungup()750 fn poll_context_hungup() { 751 let (s1, s2) = UnixStream::pair().unwrap(); 752 let ctx: PollContext<u32> = PollContext::new().unwrap(); 753 ctx.add(&s1, 1).unwrap(); 754 755 // Causes s1 to receive hangup events, which we purposefully ignore to trip the detection 756 // logic in `PollContext`. 757 drop(s2); 758 759 // Should easily panic within this many iterations. 760 for _ in 0..1000 { 761 ctx.wait().unwrap(); 762 } 763 } 764 765 #[test] poll_context_timeout()766 fn poll_context_timeout() { 767 let ctx: PollContext<u32> = PollContext::new().unwrap(); 768 let dur = Duration::from_millis(10); 769 let start_inst = Instant::now(); 770 ctx.wait_timeout(dur).unwrap(); 771 assert!(start_inst.elapsed() >= dur); 772 } 773 774 #[test] 775 #[allow(dead_code)] poll_token_derive()776 fn poll_token_derive() { 777 #[derive(PollToken)] 778 enum EmptyToken {} 779 780 #[derive(PartialEq, Debug, PollToken)] 781 enum Token { 782 Alpha, 783 Beta, 784 // comments 785 Gamma(u32), 786 Delta { index: usize }, 787 Omega, 788 } 789 790 assert_eq!( 791 Token::from_raw_token(Token::Alpha.as_raw_token()), 792 Token::Alpha 793 ); 794 assert_eq!( 795 Token::from_raw_token(Token::Beta.as_raw_token()), 796 Token::Beta 797 ); 798 assert_eq!( 799 Token::from_raw_token(Token::Gamma(55).as_raw_token()), 800 Token::Gamma(55) 801 ); 802 assert_eq!( 803 Token::from_raw_token(Token::Delta { index: 100 }.as_raw_token()), 804 Token::Delta { index: 100 } 805 ); 806 assert_eq!( 807 Token::from_raw_token(Token::Omega.as_raw_token()), 808 Token::Omega 809 ); 810 } 811 } 812