1 use std::ops::{Deref, DerefMut}; 2 #[cfg(unix)] 3 use std::os::unix::io::AsRawFd; 4 #[cfg(target_os = "wasi")] 5 use std::os::wasi::io::AsRawFd; 6 #[cfg(windows)] 7 use std::os::windows::io::AsRawSocket; 8 #[cfg(debug_assertions)] 9 use std::sync::atomic::{AtomicUsize, Ordering}; 10 use std::{fmt, io}; 11 12 use crate::sys::IoSourceState; 13 use crate::{event, Interest, Registry, Token}; 14 15 /// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`] 16 /// implementation. 17 /// 18 /// `IoSource` enables registering any FD or socket wrapper with [`Poll`]. 19 /// 20 /// While only implementations for TCP, UDP, and UDS (Unix only) are provided, 21 /// Mio supports registering any FD or socket that can be registered with the 22 /// underlying OS selector. `IoSource` provides the necessary bridge. 23 /// 24 /// [`RawFd`]: std::os::unix::io::RawFd 25 /// [`RawSocket`]: std::os::windows::io::RawSocket 26 /// 27 /// # Notes 28 /// 29 /// To handle the registrations and events properly **all** I/O operations (such 30 /// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the 31 /// internal state is updated accordingly. 32 /// 33 /// [`Poll`]: crate::Poll 34 /// [`do_io`]: IoSource::do_io 35 /* 36 /// 37 /// # Examples 38 /// 39 /// Basic usage. 40 /// 41 /// ``` 42 /// # use std::error::Error; 43 /// # fn main() -> Result<(), Box<dyn Error>> { 44 /// use mio::{Interest, Poll, Token}; 45 /// use mio::IoSource; 46 /// 47 /// use std::net; 48 /// 49 /// let poll = Poll::new()?; 50 /// 51 /// // Bind a std TCP listener. 52 /// let listener = net::TcpListener::bind("127.0.0.1:0")?; 53 /// // Wrap it in the `IoSource` type. 54 /// let mut listener = IoSource::new(listener); 55 /// 56 /// // Register the listener. 57 /// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?; 58 /// # Ok(()) 59 /// # } 60 /// ``` 61 */ 62 pub struct IoSource<T> { 63 state: IoSourceState, 64 inner: T, 65 #[cfg(debug_assertions)] 66 selector_id: SelectorId, 67 } 68 69 impl<T> IoSource<T> { 70 /// Create a new `IoSource`. new(io: T) -> IoSource<T>71 pub fn new(io: T) -> IoSource<T> { 72 IoSource { 73 state: IoSourceState::new(), 74 inner: io, 75 #[cfg(debug_assertions)] 76 selector_id: SelectorId::new(), 77 } 78 } 79 80 /// Execute an I/O operations ensuring that the socket receives more events 81 /// if it hits a [`WouldBlock`] error. 82 /// 83 /// # Notes 84 /// 85 /// This method is required to be called for **all** I/O operations to 86 /// ensure the user will receive events once the socket is ready again after 87 /// returning a [`WouldBlock`] error. 88 /// 89 /// [`WouldBlock`]: io::ErrorKind::WouldBlock do_io<F, R>(&self, f: F) -> io::Result<R> where F: FnOnce(&T) -> io::Result<R>,90 pub fn do_io<F, R>(&self, f: F) -> io::Result<R> 91 where 92 F: FnOnce(&T) -> io::Result<R>, 93 { 94 self.state.do_io(f, &self.inner) 95 } 96 97 /// Returns the I/O source, dropping the state. 98 /// 99 /// # Notes 100 /// 101 /// To ensure no more events are to be received for this I/O source first 102 /// [`deregister`] it. 103 /// 104 /// [`deregister`]: Registry::deregister into_inner(self) -> T105 pub fn into_inner(self) -> T { 106 self.inner 107 } 108 } 109 110 /// Be careful when using this method. All I/O operations that may block must go 111 /// through the [`do_io`] method. 112 /// 113 /// [`do_io`]: IoSource::do_io 114 impl<T> Deref for IoSource<T> { 115 type Target = T; 116 deref(&self) -> &Self::Target117 fn deref(&self) -> &Self::Target { 118 &self.inner 119 } 120 } 121 122 /// Be careful when using this method. All I/O operations that may block must go 123 /// through the [`do_io`] method. 124 /// 125 /// [`do_io`]: IoSource::do_io 126 impl<T> DerefMut for IoSource<T> { deref_mut(&mut self) -> &mut Self::Target127 fn deref_mut(&mut self) -> &mut Self::Target { 128 &mut self.inner 129 } 130 } 131 132 #[cfg(unix)] 133 impl<T> event::Source for IoSource<T> 134 where 135 T: AsRawFd, 136 { register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>137 fn register( 138 &mut self, 139 registry: &Registry, 140 token: Token, 141 interests: Interest, 142 ) -> io::Result<()> { 143 #[cfg(debug_assertions)] 144 self.selector_id.associate(registry)?; 145 registry 146 .selector() 147 .register(self.inner.as_raw_fd(), token, interests) 148 } 149 reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>150 fn reregister( 151 &mut self, 152 registry: &Registry, 153 token: Token, 154 interests: Interest, 155 ) -> io::Result<()> { 156 #[cfg(debug_assertions)] 157 self.selector_id.check_association(registry)?; 158 registry 159 .selector() 160 .reregister(self.inner.as_raw_fd(), token, interests) 161 } 162 deregister(&mut self, registry: &Registry) -> io::Result<()>163 fn deregister(&mut self, registry: &Registry) -> io::Result<()> { 164 #[cfg(debug_assertions)] 165 self.selector_id.remove_association(registry)?; 166 registry.selector().deregister(self.inner.as_raw_fd()) 167 } 168 } 169 170 #[cfg(windows)] 171 impl<T> event::Source for IoSource<T> 172 where 173 T: AsRawSocket, 174 { register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>175 fn register( 176 &mut self, 177 registry: &Registry, 178 token: Token, 179 interests: Interest, 180 ) -> io::Result<()> { 181 #[cfg(debug_assertions)] 182 self.selector_id.associate(registry)?; 183 self.state 184 .register(registry, token, interests, self.inner.as_raw_socket()) 185 } 186 reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>187 fn reregister( 188 &mut self, 189 registry: &Registry, 190 token: Token, 191 interests: Interest, 192 ) -> io::Result<()> { 193 #[cfg(debug_assertions)] 194 self.selector_id.check_association(registry)?; 195 self.state.reregister(registry, token, interests) 196 } 197 deregister(&mut self, _registry: &Registry) -> io::Result<()>198 fn deregister(&mut self, _registry: &Registry) -> io::Result<()> { 199 #[cfg(debug_assertions)] 200 self.selector_id.remove_association(_registry)?; 201 self.state.deregister() 202 } 203 } 204 205 #[cfg(target_os = "wasi")] 206 impl<T> event::Source for IoSource<T> 207 where 208 T: AsRawFd, 209 { register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>210 fn register( 211 &mut self, 212 registry: &Registry, 213 token: Token, 214 interests: Interest, 215 ) -> io::Result<()> { 216 #[cfg(debug_assertions)] 217 self.selector_id.associate(registry)?; 218 registry 219 .selector() 220 .register(self.inner.as_raw_fd() as _, token, interests) 221 } 222 reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>223 fn reregister( 224 &mut self, 225 registry: &Registry, 226 token: Token, 227 interests: Interest, 228 ) -> io::Result<()> { 229 #[cfg(debug_assertions)] 230 self.selector_id.check_association(registry)?; 231 registry 232 .selector() 233 .reregister(self.inner.as_raw_fd() as _, token, interests) 234 } 235 deregister(&mut self, registry: &Registry) -> io::Result<()>236 fn deregister(&mut self, registry: &Registry) -> io::Result<()> { 237 #[cfg(debug_assertions)] 238 self.selector_id.remove_association(registry)?; 239 registry.selector().deregister(self.inner.as_raw_fd() as _) 240 } 241 } 242 243 impl<T> fmt::Debug for IoSource<T> 244 where 245 T: fmt::Debug, 246 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 248 self.inner.fmt(f) 249 } 250 } 251 252 /// Used to associate an `IoSource` with a `sys::Selector`. 253 #[cfg(debug_assertions)] 254 #[derive(Debug)] 255 struct SelectorId { 256 id: AtomicUsize, 257 } 258 259 #[cfg(debug_assertions)] 260 impl SelectorId { 261 /// Value of `id` if `SelectorId` is not associated with any 262 /// `sys::Selector`. Valid selector ids start at 1. 263 const UNASSOCIATED: usize = 0; 264 265 /// Create a new `SelectorId`. new() -> SelectorId266 const fn new() -> SelectorId { 267 SelectorId { 268 id: AtomicUsize::new(Self::UNASSOCIATED), 269 } 270 } 271 272 /// Associate an I/O source with `registry`, returning an error if its 273 /// already registered. associate(&self, registry: &Registry) -> io::Result<()>274 fn associate(&self, registry: &Registry) -> io::Result<()> { 275 let registry_id = registry.selector().id(); 276 let previous_id = self.id.swap(registry_id, Ordering::AcqRel); 277 278 if previous_id == Self::UNASSOCIATED { 279 Ok(()) 280 } else { 281 Err(io::Error::new( 282 io::ErrorKind::AlreadyExists, 283 "I/O source already registered with a `Registry`", 284 )) 285 } 286 } 287 288 /// Check the association of an I/O source with `registry`, returning an 289 /// error if its registered with a different `Registry` or not registered at 290 /// all. check_association(&self, registry: &Registry) -> io::Result<()>291 fn check_association(&self, registry: &Registry) -> io::Result<()> { 292 let registry_id = registry.selector().id(); 293 let id = self.id.load(Ordering::Acquire); 294 295 if id == registry_id { 296 Ok(()) 297 } else if id == Self::UNASSOCIATED { 298 Err(io::Error::new( 299 io::ErrorKind::NotFound, 300 "I/O source not registered with `Registry`", 301 )) 302 } else { 303 Err(io::Error::new( 304 io::ErrorKind::AlreadyExists, 305 "I/O source already registered with a different `Registry`", 306 )) 307 } 308 } 309 310 /// Remove a previously made association from `registry`, returns an error 311 /// if it was not previously associated with `registry`. remove_association(&self, registry: &Registry) -> io::Result<()>312 fn remove_association(&self, registry: &Registry) -> io::Result<()> { 313 let registry_id = registry.selector().id(); 314 let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel); 315 316 if previous_id == registry_id { 317 Ok(()) 318 } else { 319 Err(io::Error::new( 320 io::ErrorKind::NotFound, 321 "I/O source not registered with `Registry`", 322 )) 323 } 324 } 325 } 326 327 #[cfg(debug_assertions)] 328 impl Clone for SelectorId { clone(&self) -> SelectorId329 fn clone(&self) -> SelectorId { 330 SelectorId { 331 id: AtomicUsize::new(self.id.load(Ordering::Acquire)), 332 } 333 } 334 } 335