• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #[cfg(any(target_os = "linux", target_os = "android"))]
2 mod eventfd {
3     use crate::sys::Selector;
4     use crate::{Interest, Token};
5 
6     use std::fs::File;
7     use std::io::{self, Read, Write};
8     use std::os::unix::io::FromRawFd;
9 
10     /// Waker backed by `eventfd`.
11     ///
12     /// `eventfd` is effectively an 64 bit counter. All writes must be of 8
13     /// bytes (64 bits) and are converted (native endian) into an 64 bit
14     /// unsigned integer and added to the count. Reads must also be 8 bytes and
15     /// reset the count to 0, returning the count.
16     #[derive(Debug)]
17     pub struct Waker {
18         fd: File,
19     }
20 
21     impl Waker {
new(selector: &Selector, token: Token) -> io::Result<Waker>22         pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
23             let fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
24             let file = unsafe { File::from_raw_fd(fd) };
25 
26             selector.register(fd, token, Interest::READABLE)?;
27             Ok(Waker { fd: file })
28         }
29 
wake(&self) -> io::Result<()>30         pub fn wake(&self) -> io::Result<()> {
31             let buf: [u8; 8] = 1u64.to_ne_bytes();
32             match (&self.fd).write(&buf) {
33                 Ok(_) => Ok(()),
34                 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
35                     // Writing only blocks if the counter is going to overflow.
36                     // So we'll reset the counter to 0 and wake it again.
37                     self.reset()?;
38                     self.wake()
39                 }
40                 Err(err) => Err(err),
41             }
42         }
43 
44         /// Reset the eventfd object, only need to call this if `wake` fails.
reset(&self) -> io::Result<()>45         fn reset(&self) -> io::Result<()> {
46             let mut buf: [u8; 8] = 0u64.to_ne_bytes();
47             match (&self.fd).read(&mut buf) {
48                 Ok(_) => Ok(()),
49                 // If the `Waker` hasn't been awoken yet this will return a
50                 // `WouldBlock` error which we can safely ignore.
51                 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => Ok(()),
52                 Err(err) => Err(err),
53             }
54         }
55     }
56 }
57 
58 #[cfg(any(target_os = "linux", target_os = "android"))]
59 pub use self::eventfd::Waker;
60 
61 #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
62 mod kqueue {
63     use crate::sys::Selector;
64     use crate::Token;
65 
66     use std::io;
67 
68     /// Waker backed by kqueue user space notifications (`EVFILT_USER`).
69     ///
70     /// The implementation is fairly simple, first the kqueue must be setup to
71     /// receive waker events this done by calling `Selector.setup_waker`. Next
72     /// we need access to kqueue, thus we need to duplicate the file descriptor.
73     /// Now waking is as simple as adding an event to the kqueue.
74     #[derive(Debug)]
75     pub struct Waker {
76         selector: Selector,
77         token: Token,
78     }
79 
80     impl Waker {
new(selector: &Selector, token: Token) -> io::Result<Waker>81         pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
82             let selector = selector.try_clone()?;
83             selector.setup_waker(token)?;
84             Ok(Waker { selector, token })
85         }
86 
wake(&self) -> io::Result<()>87         pub fn wake(&self) -> io::Result<()> {
88             self.selector.wake(self.token)
89         }
90     }
91 }
92 
93 #[cfg(any(target_os = "freebsd", target_os = "ios", target_os = "macos"))]
94 pub use self::kqueue::Waker;
95 
96 #[cfg(any(
97     target_os = "dragonfly",
98     target_os = "illumos",
99     target_os = "netbsd",
100     target_os = "openbsd",
101     target_os = "redox",
102 ))]
103 mod pipe {
104     use crate::sys::unix::Selector;
105     use crate::{Interest, Token};
106 
107     use std::fs::File;
108     use std::io::{self, Read, Write};
109     use std::os::unix::io::FromRawFd;
110 
111     /// Waker backed by a unix pipe.
112     ///
113     /// Waker controls both the sending and receiving ends and empties the pipe
114     /// if writing to it (waking) fails.
115     #[derive(Debug)]
116     pub struct Waker {
117         sender: File,
118         receiver: File,
119     }
120 
121     impl Waker {
new(selector: &Selector, token: Token) -> io::Result<Waker>122         pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
123             let mut fds = [-1; 2];
124             syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?;
125             let sender = unsafe { File::from_raw_fd(fds[1]) };
126             let receiver = unsafe { File::from_raw_fd(fds[0]) };
127 
128             selector.register(fds[0], token, Interest::READABLE)?;
129             Ok(Waker { sender, receiver })
130         }
131 
wake(&self) -> io::Result<()>132         pub fn wake(&self) -> io::Result<()> {
133             // The epoll emulation on some illumos systems currently requires
134             // the pipe buffer to be completely empty for an edge-triggered
135             // wakeup on the pipe read side.
136             #[cfg(target_os = "illumos")]
137             self.empty();
138 
139             match (&self.sender).write(&[1]) {
140                 Ok(_) => Ok(()),
141                 Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
142                     // The reading end is full so we'll empty the buffer and try
143                     // again.
144                     self.empty();
145                     self.wake()
146                 }
147                 Err(ref err) if err.kind() == io::ErrorKind::Interrupted => self.wake(),
148                 Err(err) => Err(err),
149             }
150         }
151 
152         /// Empty the pipe's buffer, only need to call this if `wake` fails.
153         /// This ignores any errors.
empty(&self)154         fn empty(&self) {
155             let mut buf = [0; 4096];
156             loop {
157                 match (&self.receiver).read(&mut buf) {
158                     Ok(n) if n > 0 => continue,
159                     _ => return,
160                 }
161             }
162         }
163     }
164 }
165 
166 #[cfg(any(
167     target_os = "dragonfly",
168     target_os = "illumos",
169     target_os = "netbsd",
170     target_os = "openbsd",
171     target_os = "redox",
172 ))]
173 pub use self::pipe::Waker;
174