• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::ops::{Deref, DerefMut};
2 #[cfg(unix)]
3 use std::os::unix::io::AsRawFd;
4 #[cfg(target_os = "wasi")]
5 use std::os::wasi::io::AsRawFd;
6 #[cfg(windows)]
7 use std::os::windows::io::AsRawSocket;
8 #[cfg(debug_assertions)]
9 use std::sync::atomic::{AtomicUsize, Ordering};
10 use std::{fmt, io};
11 
12 use crate::sys::IoSourceState;
13 use crate::{event, Interest, Registry, Token};
14 
15 /// Adapter for a [`RawFd`] or [`RawSocket`] providing an [`event::Source`]
16 /// implementation.
17 ///
18 /// `IoSource` enables registering any FD or socket wrapper with [`Poll`].
19 ///
20 /// While only implementations for TCP, UDP, and UDS (Unix only) are provided,
21 /// Mio supports registering any FD or socket that can be registered with the
22 /// underlying OS selector. `IoSource` provides the necessary bridge.
23 ///
24 /// [`RawFd`]: std::os::unix::io::RawFd
25 /// [`RawSocket`]: std::os::windows::io::RawSocket
26 ///
27 /// # Notes
28 ///
29 /// To handle the registrations and events properly **all** I/O operations (such
30 /// as `read`, `write`, etc.) must go through the [`do_io`] method to ensure the
31 /// internal state is updated accordingly.
32 ///
33 /// [`Poll`]: crate::Poll
34 /// [`do_io`]: IoSource::do_io
35 /*
36 ///
37 /// # Examples
38 ///
39 /// Basic usage.
40 ///
41 /// ```
42 /// # use std::error::Error;
43 /// # fn main() -> Result<(), Box<dyn Error>> {
44 /// use mio::{Interest, Poll, Token};
45 /// use mio::IoSource;
46 ///
47 /// use std::net;
48 ///
49 /// let poll = Poll::new()?;
50 ///
51 /// // Bind a std TCP listener.
52 /// let listener = net::TcpListener::bind("127.0.0.1:0")?;
53 /// // Wrap it in the `IoSource` type.
54 /// let mut listener = IoSource::new(listener);
55 ///
56 /// // Register the listener.
57 /// poll.registry().register(&mut listener, Token(0), Interest::READABLE)?;
58 /// #     Ok(())
59 /// # }
60 /// ```
61 */
62 pub struct IoSource<T> {
63     state: IoSourceState,
64     inner: T,
65     #[cfg(debug_assertions)]
66     selector_id: SelectorId,
67 }
68 
69 impl<T> IoSource<T> {
70     /// Create a new `IoSource`.
new(io: T) -> IoSource<T>71     pub fn new(io: T) -> IoSource<T> {
72         IoSource {
73             state: IoSourceState::new(),
74             inner: io,
75             #[cfg(debug_assertions)]
76             selector_id: SelectorId::new(),
77         }
78     }
79 
80     /// Execute an I/O operations ensuring that the socket receives more events
81     /// if it hits a [`WouldBlock`] error.
82     ///
83     /// # Notes
84     ///
85     /// This method is required to be called for **all** I/O operations to
86     /// ensure the user will receive events once the socket is ready again after
87     /// returning a [`WouldBlock`] error.
88     ///
89     /// [`WouldBlock`]: io::ErrorKind::WouldBlock
do_io<F, R>(&self, f: F) -> io::Result<R> where F: FnOnce(&T) -> io::Result<R>,90     pub fn do_io<F, R>(&self, f: F) -> io::Result<R>
91     where
92         F: FnOnce(&T) -> io::Result<R>,
93     {
94         self.state.do_io(f, &self.inner)
95     }
96 
97     /// Returns the I/O source, dropping the state.
98     ///
99     /// # Notes
100     ///
101     /// To ensure no more events are to be received for this I/O source first
102     /// [`deregister`] it.
103     ///
104     /// [`deregister`]: Registry::deregister
into_inner(self) -> T105     pub fn into_inner(self) -> T {
106         self.inner
107     }
108 }
109 
110 /// Be careful when using this method. All I/O operations that may block must go
111 /// through the [`do_io`] method.
112 ///
113 /// [`do_io`]: IoSource::do_io
114 impl<T> Deref for IoSource<T> {
115     type Target = T;
116 
deref(&self) -> &Self::Target117     fn deref(&self) -> &Self::Target {
118         &self.inner
119     }
120 }
121 
122 /// Be careful when using this method. All I/O operations that may block must go
123 /// through the [`do_io`] method.
124 ///
125 /// [`do_io`]: IoSource::do_io
126 impl<T> DerefMut for IoSource<T> {
deref_mut(&mut self) -> &mut Self::Target127     fn deref_mut(&mut self) -> &mut Self::Target {
128         &mut self.inner
129     }
130 }
131 
132 #[cfg(unix)]
133 impl<T> event::Source for IoSource<T>
134 where
135     T: AsRawFd,
136 {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>137     fn register(
138         &mut self,
139         registry: &Registry,
140         token: Token,
141         interests: Interest,
142     ) -> io::Result<()> {
143         #[cfg(debug_assertions)]
144         self.selector_id.associate(registry)?;
145         registry
146             .selector()
147             .register(self.inner.as_raw_fd(), token, interests)
148     }
149 
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>150     fn reregister(
151         &mut self,
152         registry: &Registry,
153         token: Token,
154         interests: Interest,
155     ) -> io::Result<()> {
156         #[cfg(debug_assertions)]
157         self.selector_id.check_association(registry)?;
158         registry
159             .selector()
160             .reregister(self.inner.as_raw_fd(), token, interests)
161     }
162 
deregister(&mut self, registry: &Registry) -> io::Result<()>163     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
164         #[cfg(debug_assertions)]
165         self.selector_id.remove_association(registry)?;
166         registry.selector().deregister(self.inner.as_raw_fd())
167     }
168 }
169 
170 #[cfg(windows)]
171 impl<T> event::Source for IoSource<T>
172 where
173     T: AsRawSocket,
174 {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>175     fn register(
176         &mut self,
177         registry: &Registry,
178         token: Token,
179         interests: Interest,
180     ) -> io::Result<()> {
181         #[cfg(debug_assertions)]
182         self.selector_id.associate(registry)?;
183         self.state
184             .register(registry, token, interests, self.inner.as_raw_socket())
185     }
186 
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>187     fn reregister(
188         &mut self,
189         registry: &Registry,
190         token: Token,
191         interests: Interest,
192     ) -> io::Result<()> {
193         #[cfg(debug_assertions)]
194         self.selector_id.check_association(registry)?;
195         self.state.reregister(registry, token, interests)
196     }
197 
deregister(&mut self, _registry: &Registry) -> io::Result<()>198     fn deregister(&mut self, _registry: &Registry) -> io::Result<()> {
199         #[cfg(debug_assertions)]
200         self.selector_id.remove_association(_registry)?;
201         self.state.deregister()
202     }
203 }
204 
205 #[cfg(target_os = "wasi")]
206 impl<T> event::Source for IoSource<T>
207 where
208     T: AsRawFd,
209 {
register( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>210     fn register(
211         &mut self,
212         registry: &Registry,
213         token: Token,
214         interests: Interest,
215     ) -> io::Result<()> {
216         #[cfg(debug_assertions)]
217         self.selector_id.associate(registry)?;
218         registry
219             .selector()
220             .register(self.inner.as_raw_fd() as _, token, interests)
221     }
222 
reregister( &mut self, registry: &Registry, token: Token, interests: Interest, ) -> io::Result<()>223     fn reregister(
224         &mut self,
225         registry: &Registry,
226         token: Token,
227         interests: Interest,
228     ) -> io::Result<()> {
229         #[cfg(debug_assertions)]
230         self.selector_id.check_association(registry)?;
231         registry
232             .selector()
233             .reregister(self.inner.as_raw_fd() as _, token, interests)
234     }
235 
deregister(&mut self, registry: &Registry) -> io::Result<()>236     fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
237         #[cfg(debug_assertions)]
238         self.selector_id.remove_association(registry)?;
239         registry.selector().deregister(self.inner.as_raw_fd() as _)
240     }
241 }
242 
243 impl<T> fmt::Debug for IoSource<T>
244 where
245     T: fmt::Debug,
246 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result247     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248         self.inner.fmt(f)
249     }
250 }
251 
252 /// Used to associate an `IoSource` with a `sys::Selector`.
253 #[cfg(debug_assertions)]
254 #[derive(Debug)]
255 struct SelectorId {
256     id: AtomicUsize,
257 }
258 
259 #[cfg(debug_assertions)]
260 impl SelectorId {
261     /// Value of `id` if `SelectorId` is not associated with any
262     /// `sys::Selector`. Valid selector ids start at 1.
263     const UNASSOCIATED: usize = 0;
264 
265     /// Create a new `SelectorId`.
new() -> SelectorId266     const fn new() -> SelectorId {
267         SelectorId {
268             id: AtomicUsize::new(Self::UNASSOCIATED),
269         }
270     }
271 
272     /// Associate an I/O source with `registry`, returning an error if its
273     /// already registered.
associate(&self, registry: &Registry) -> io::Result<()>274     fn associate(&self, registry: &Registry) -> io::Result<()> {
275         let registry_id = registry.selector().id();
276         let previous_id = self.id.swap(registry_id, Ordering::AcqRel);
277 
278         if previous_id == Self::UNASSOCIATED {
279             Ok(())
280         } else {
281             Err(io::Error::new(
282                 io::ErrorKind::AlreadyExists,
283                 "I/O source already registered with a `Registry`",
284             ))
285         }
286     }
287 
288     /// Check the association of an I/O source with `registry`, returning an
289     /// error if its registered with a different `Registry` or not registered at
290     /// all.
check_association(&self, registry: &Registry) -> io::Result<()>291     fn check_association(&self, registry: &Registry) -> io::Result<()> {
292         let registry_id = registry.selector().id();
293         let id = self.id.load(Ordering::Acquire);
294 
295         if id == registry_id {
296             Ok(())
297         } else if id == Self::UNASSOCIATED {
298             Err(io::Error::new(
299                 io::ErrorKind::NotFound,
300                 "I/O source not registered with `Registry`",
301             ))
302         } else {
303             Err(io::Error::new(
304                 io::ErrorKind::AlreadyExists,
305                 "I/O source already registered with a different `Registry`",
306             ))
307         }
308     }
309 
310     /// Remove a previously made association from `registry`, returns an error
311     /// if it was not previously associated with `registry`.
remove_association(&self, registry: &Registry) -> io::Result<()>312     fn remove_association(&self, registry: &Registry) -> io::Result<()> {
313         let registry_id = registry.selector().id();
314         let previous_id = self.id.swap(Self::UNASSOCIATED, Ordering::AcqRel);
315 
316         if previous_id == registry_id {
317             Ok(())
318         } else {
319             Err(io::Error::new(
320                 io::ErrorKind::NotFound,
321                 "I/O source not registered with `Registry`",
322             ))
323         }
324     }
325 }
326 
327 #[cfg(debug_assertions)]
328 impl Clone for SelectorId {
clone(&self) -> SelectorId329     fn clone(&self) -> SelectorId {
330         SelectorId {
331             id: AtomicUsize::new(self.id.load(Ordering::Acquire)),
332         }
333     }
334 }
335