1 #[cfg(any(target_os = "linux", target_os = "android"))] 2 mod eventfd { 3 use crate::sys::Selector; 4 use crate::{Interest, Token}; 5 6 use std::fs::File; 7 use std::io::{self, Read, Write}; 8 use std::os::unix::io::FromRawFd; 9 10 /// Waker backed by `eventfd`. 11 /// 12 /// `eventfd` is effectively an 64 bit counter. All writes must be of 8 13 /// bytes (64 bits) and are converted (native endian) into an 64 bit 14 /// unsigned integer and added to the count. Reads must also be 8 bytes and 15 /// reset the count to 0, returning the count. 16 #[derive(Debug)] 17 pub struct Waker { 18 fd: File, 19 } 20 21 impl Waker { new(selector: &Selector, token: Token) -> io::Result<Waker>22 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { 23 let fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?; 24 let file = unsafe { File::from_raw_fd(fd) }; 25 26 selector.register(fd, token, Interest::READABLE)?; 27 Ok(Waker { fd: file }) 28 } 29 wake(&self) -> io::Result<()>30 pub fn wake(&self) -> io::Result<()> { 31 let buf: [u8; 8] = 1u64.to_ne_bytes(); 32 match (&self.fd).write(&buf) { 33 Ok(_) => Ok(()), 34 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { 35 // Writing only blocks if the counter is going to overflow. 36 // So we'll reset the counter to 0 and wake it again. 37 self.reset()?; 38 self.wake() 39 } 40 Err(err) => Err(err), 41 } 42 } 43 44 /// Reset the eventfd object, only need to call this if `wake` fails. reset(&self) -> io::Result<()>45 fn reset(&self) -> io::Result<()> { 46 let mut buf: [u8; 8] = 0u64.to_ne_bytes(); 47 match (&self.fd).read(&mut buf) { 48 Ok(_) => Ok(()), 49 // If the `Waker` hasn't been awoken yet this will return a 50 // `WouldBlock` error which we can safely ignore. 51 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()), 52 Err(err) => Err(err), 53 } 54 } 55 } 56 } 57 58 #[cfg(any(target_os = "linux", target_os = "android"))] 59 pub use self::eventfd::Waker; 60 61 #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] 62 mod kqueue { 63 use crate::sys::Selector; 64 use crate::Token; 65 66 use std::io; 67 68 /// Waker backed by kqueue user space notifications (`EVFILT_USER`). 69 /// 70 /// The implementation is fairly simple, first the kqueue must be setup to 71 /// receive waker events this done by calling `Selector.setup_waker`. Next 72 /// we need access to kqueue, thus we need to duplicate the file descriptor. 73 /// Now waking is as simple as adding an event to the kqueue. 74 #[derive(Debug)] 75 pub struct Waker { 76 selector: Selector, 77 token: Token, 78 } 79 80 impl Waker { new(selector: &Selector, token: Token) -> io::Result<Waker>81 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { 82 let selector = selector.try_clone()?; 83 selector.setup_waker(token)?; 84 Ok(Waker { selector, token }) 85 } 86 wake(&self) -> io::Result<()>87 pub fn wake(&self) -> io::Result<()> { 88 self.selector.wake(self.token) 89 } 90 } 91 } 92 93 #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))] 94 pub use self::kqueue::Waker; 95 96 #[cfg(any( 97 target_os = "dragonfly", 98 target_os = "illumos", 99 target_os = "netbsd", 100 target_os = "openbsd", 101 target_os = "redox", 102 ))] 103 mod pipe { 104 use crate::sys::unix::Selector; 105 use crate::{Interest, Token}; 106 107 use std::fs::File; 108 use std::io::{self, Read, Write}; 109 use std::os::unix::io::FromRawFd; 110 111 /// Waker backed by a unix pipe. 112 /// 113 /// Waker controls both the sending and receiving ends and empties the pipe 114 /// if writing to it (waking) fails. 115 #[derive(Debug)] 116 pub struct Waker { 117 sender: File, 118 receiver: File, 119 } 120 121 impl Waker { new(selector: &Selector, token: Token) -> io::Result<Waker>122 pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> { 123 let mut fds = [-1; 2]; 124 syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; 125 let sender = unsafe { File::from_raw_fd(fds[1]) }; 126 let receiver = unsafe { File::from_raw_fd(fds[0]) }; 127 128 selector.register(fds[0], token, Interest::READABLE)?; 129 Ok(Waker { sender, receiver }) 130 } 131 wake(&self) -> io::Result<()>132 pub fn wake(&self) -> io::Result<()> { 133 // The epoll emulation on some illumos systems currently requires 134 // the pipe buffer to be completely empty for an edge-triggered 135 // wakeup on the pipe read side. 136 #[cfg(target_os = "illumos")] 137 self.empty(); 138 139 match (&self.sender).write(&[1]) { 140 Ok(_) => Ok(()), 141 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { 142 // The reading end is full so we'll empty the buffer and try 143 // again. 144 self.empty(); 145 self.wake() 146 } 147 Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(), 148 Err(err) => Err(err), 149 } 150 } 151 152 /// Empty the pipe's buffer, only need to call this if `wake` fails. 153 /// This ignores any errors. empty(&self)154 fn empty(&self) { 155 let mut buf = [0; 4096]; 156 loop { 157 match (&self.receiver).read(&mut buf) { 158 Ok(n) if n > 0 => continue, 159 _ => return, 160 } 161 } 162 } 163 } 164 } 165 166 #[cfg(any( 167 target_os = "dragonfly", 168 target_os = "illumos", 169 target_os = "netbsd", 170 target_os = "openbsd", 171 target_os = "redox", 172 ))] 173 pub use self::pipe::Waker; 174