use crate::{event, sys, Events, Interest, Token}; use log::trace; #[cfg(unix)] use std::os::unix::io::{AsRawFd, RawFd}; use std::time::Duration; use std::{fmt, io}; /// Polls for readiness events on all registered values. /// /// `Poll` allows a program to monitor a large number of [`event::Source`]s, /// waiting until one or more become "ready" for some class of operations; e.g. /// reading and writing. An event source is considered ready if it is possible /// to immediately perform a corresponding operation; e.g. [`read`] or /// [`write`]. /// /// To use `Poll`, an `event::Source` must first be registered with the `Poll` /// instance using the [`register`] method on its associated `Register`, /// supplying readiness interest. The readiness interest tells `Poll` which /// specific operations on the handle to monitor for readiness. A `Token` is /// also passed to the [`register`] function. When `Poll` returns a readiness /// event, it will include this token. This associates the event with the /// event source that generated the event. /// /// [`event::Source`]: ./event/trait.Source.html /// [`read`]: ./net/struct.TcpStream.html#method.read /// [`write`]: ./net/struct.TcpStream.html#method.write /// [`register`]: struct.Registry.html#method.register /// /// # Examples /// /// A basic example -- establishing a `TcpStream` connection. /// #[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")] #[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")] /// # use std::error::Error; /// # fn main() -> Result<(), Box> { /// use mio::{Events, Poll, Interest, Token}; /// use mio::net::TcpStream; /// /// use std::net::{self, SocketAddr}; /// /// // Bind a server socket to connect to. /// let addr: SocketAddr = "127.0.0.1:0".parse()?; /// let server = net::TcpListener::bind(addr)?; /// /// // Construct a new `Poll` handle as well as the `Events` we'll store into /// let mut poll = Poll::new()?; /// let mut events = Events::with_capacity(1024); /// /// // Connect the stream /// let mut stream = TcpStream::connect(server.local_addr()?)?; /// /// // Register the stream with `Poll` /// poll.registry().register(&mut stream, Token(0), Interest::READABLE | Interest::WRITABLE)?; /// /// // Wait for the socket to become ready. This has to happens in a loop to /// // handle spurious wakeups. /// loop { /// poll.poll(&mut events, None)?; /// /// for event in &events { /// if event.token() == Token(0) && event.is_writable() { /// // The socket connected (probably, it could still be a spurious /// // wakeup) /// return Ok(()); /// } /// } /// } /// # } /// ``` /// /// # Portability /// /// Using `Poll` provides a portable interface across supported platforms as /// long as the caller takes the following into consideration: /// /// ### Spurious events /// /// [`Poll::poll`] may return readiness events even if the associated /// event source is not actually ready. Given the same code, this may /// happen more on some platforms than others. It is important to never assume /// that, just because a readiness event was received, that the associated /// operation will succeed as well. /// /// If operation fails with [`WouldBlock`], then the caller should not treat /// this as an error, but instead should wait until another readiness event is /// received. /// /// ### Draining readiness /// /// Once a readiness event is received, the corresponding operation must be /// performed repeatedly until it returns [`WouldBlock`]. Unless this is done, /// there is no guarantee that another readiness event will be delivered, even /// if further data is received for the event source. /// /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock /// /// ### Readiness operations /// /// The only readiness operations that are guaranteed to be present on all /// supported platforms are [`readable`] and [`writable`]. All other readiness /// operations may have false negatives and as such should be considered /// **hints**. This means that if a socket is registered with [`readable`] /// interest and either an error or close is received, a readiness event will /// be generated for the socket, but it **may** only include `readable` /// readiness. Also note that, given the potential for spurious events, /// receiving a readiness event with `read_closed`, `write_closed`, or `error` /// doesn't actually mean that a `read` on the socket will return a result /// matching the readiness event. /// /// In other words, portable programs that explicitly check for [`read_closed`], /// [`write_closed`], or [`error`] readiness should be doing so as an /// **optimization** and always be able to handle an error or close situation /// when performing the actual read operation. /// /// [`readable`]: ./event/struct.Event.html#method.is_readable /// [`writable`]: ./event/struct.Event.html#method.is_writable /// [`error`]: ./event/struct.Event.html#method.is_error /// [`read_closed`]: ./event/struct.Event.html#method.is_read_closed /// [`write_closed`]: ./event/struct.Event.html#method.is_write_closed /// /// ### Registering handles /// /// Unless otherwise noted, it should be assumed that types implementing /// [`event::Source`] will never become ready unless they are registered with /// `Poll`. /// /// For example: /// #[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")] #[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")] /// # use std::error::Error; /// # use std::net; /// # fn main() -> Result<(), Box> { /// use mio::{Poll, Interest, Token}; /// use mio::net::TcpStream; /// use std::net::SocketAddr; /// use std::time::Duration; /// use std::thread; /// /// let address: SocketAddr = "127.0.0.1:0".parse()?; /// let listener = net::TcpListener::bind(address)?; /// let mut sock = TcpStream::connect(listener.local_addr()?)?; /// /// thread::sleep(Duration::from_secs(1)); /// /// let poll = Poll::new()?; /// /// // The connect is not guaranteed to have started until it is registered at /// // this point /// poll.registry().register(&mut sock, Token(0), Interest::READABLE | Interest::WRITABLE)?; /// # Ok(()) /// # } /// ``` /// /// ### Dropping `Poll` /// /// When the `Poll` instance is dropped it may cancel in-flight operations for /// the registered [event sources], meaning that no further events for them may /// be received. It also means operations on the registered event sources may no /// longer work. It is up to the user to keep the `Poll` instance alive while /// registered event sources are being used. /// /// [event sources]: ./event/trait.Source.html /// /// # Implementation notes /// /// `Poll` is backed by the selector provided by the operating system. /// /// | OS | Selector | /// |---------------|-----------| /// | Android | [epoll] | /// | DragonFly BSD | [kqueue] | /// | FreeBSD | [kqueue] | /// | Linux | [epoll] | /// | NetBSD | [kqueue] | /// | OpenBSD | [kqueue] | /// | Solaris | [epoll] | /// | illumos | [epoll] | /// | Windows | [IOCP] | /// | iOS | [kqueue] | /// | macOS | [kqueue] | /// /// On all supported platforms, socket operations are handled by using the /// system selector. Platform specific extensions (e.g. [`SourceFd`]) allow /// accessing other features provided by individual system selectors. For /// example, Linux's [`signalfd`] feature can be used by registering the FD with /// `Poll` via [`SourceFd`]. /// /// On all platforms except windows, a call to [`Poll::poll`] is mostly just a /// direct call to the system selector. However, [IOCP] uses a completion model /// instead of a readiness model. In this case, `Poll` must adapt the completion /// model Mio's API. While non-trivial, the bridge layer is still quite /// efficient. The most expensive part being calls to `read` and `write` require /// data to be copied into an intermediate buffer before it is passed to the /// kernel. /// /// [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html /// [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2 /// [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx /// [`signalfd`]: http://man7.org/linux/man-pages/man2/signalfd.2.html /// [`SourceFd`]: unix/struct.SourceFd.html /// [`Poll::poll`]: struct.Poll.html#method.poll pub struct Poll { registry: Registry, } /// Registers I/O resources. pub struct Registry { selector: sys::Selector, } impl Poll { /// Create a separate `Registry` which can be used to register /// `event::Source`s. pub fn registry(&self) -> &Registry { &self.registry } /// Wait for readiness events /// /// Blocks the current thread and waits for readiness events for any of the /// [`event::Source`]s that have been registered with this `Poll` instance. /// The function will block until either at least one readiness event has /// been received or `timeout` has elapsed. A `timeout` of `None` means that /// `poll` will block until a readiness event has been received. /// /// The supplied `events` will be cleared and newly received readiness events /// will be pushed onto the end. At most `events.capacity()` events will be /// returned. If there are further pending readiness events, they will be /// returned on the next call to `poll`. /// /// A single call to `poll` may result in multiple readiness events being /// returned for a single event source. For example, if a TCP socket becomes /// both readable and writable, it may be possible for a single readiness /// event to be returned with both [`readable`] and [`writable`] readiness /// **OR** two separate events may be returned, one with [`readable`] set /// and one with [`writable`] set. /// /// Note that the `timeout` will be rounded up to the system clock /// granularity (usually 1ms), and kernel scheduling delays mean that /// the blocking interval may be overrun by a small amount. /// /// See the [struct] level documentation for a higher level discussion of /// polling. /// /// [`event::Source`]: ./event/trait.Source.html /// [`readable`]: struct.Interest.html#associatedconstant.READABLE /// [`writable`]: struct.Interest.html#associatedconstant.WRITABLE /// [struct]: struct.Poll.html /// [`iter`]: ./event/struct.Events.html#method.iter /// /// # Notes /// /// This returns any errors without attempting to retry, previous versions /// of Mio would automatically retry the poll call if it was interrupted /// (if `EINTR` was returned). /// /// # Examples /// /// A basic example -- establishing a `TcpStream` connection. /// #[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")] #[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")] /// # use std::error::Error; /// # fn main() -> Result<(), Box> { /// use mio::{Events, Poll, Interest, Token}; /// use mio::net::TcpStream; /// /// use std::net::{TcpListener, SocketAddr}; /// use std::thread; /// /// // Bind a server socket to connect to. /// let addr: SocketAddr = "127.0.0.1:0".parse()?; /// let server = TcpListener::bind(addr)?; /// let addr = server.local_addr()?.clone(); /// /// // Spawn a thread to accept the socket /// thread::spawn(move || { /// let _ = server.accept(); /// }); /// /// // Construct a new `Poll` handle as well as the `Events` we'll store into /// let mut poll = Poll::new()?; /// let mut events = Events::with_capacity(1024); /// /// // Connect the stream /// let mut stream = TcpStream::connect(addr)?; /// /// // Register the stream with `Poll` /// poll.registry().register( /// &mut stream, /// Token(0), /// Interest::READABLE | Interest::WRITABLE)?; /// /// // Wait for the socket to become ready. This has to happens in a loop to /// // handle spurious wakeups. /// loop { /// poll.poll(&mut events, None)?; /// /// for event in &events { /// if event.token() == Token(0) && event.is_writable() { /// // The socket connected (probably, it could still be a spurious /// // wakeup) /// return Ok(()); /// } /// } /// } /// # } /// ``` /// /// [struct]: # pub fn poll(&mut self, events: &mut Events, timeout: Option) -> io::Result<()> { self.registry.selector.select(events.sys(), timeout) } } cfg_os_poll! { impl Poll { /// Return a new `Poll` handle. /// /// This function will make a syscall to the operating system to create /// the system selector. If this syscall fails, `Poll::new` will return /// with the error. /// /// See [struct] level docs for more details. /// /// [struct]: struct.Poll.html /// /// # Examples /// /// ``` /// # use std::error::Error; /// # fn main() -> Result<(), Box> { /// use mio::{Poll, Events}; /// use std::time::Duration; /// /// let mut poll = match Poll::new() { /// Ok(poll) => poll, /// Err(e) => panic!("failed to create Poll instance; err={:?}", e), /// }; /// /// // Create a structure to receive polled events /// let mut events = Events::with_capacity(1024); /// /// // Wait for events, but none will be received because no /// // `event::Source`s have been registered with this `Poll` instance. /// poll.poll(&mut events, Some(Duration::from_millis(500)))?; /// assert!(events.is_empty()); /// # Ok(()) /// # } /// ``` pub fn new() -> io::Result { sys::Selector::new().map(|selector| Poll { registry: Registry { selector }, }) } } } #[cfg(unix)] impl AsRawFd for Poll { fn as_raw_fd(&self) -> RawFd { self.registry.as_raw_fd() } } impl fmt::Debug for Poll { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Poll").finish() } } impl Registry { /// Register an [`event::Source`] with the `Poll` instance. /// /// Once registered, the `Poll` instance will monitor the event source for /// readiness state changes. When it notices a state change, it will return /// a readiness event for the handle the next time [`poll`] is called. /// /// See [`Poll`] docs for a high level overview. /// /// # Arguments /// /// `source: &S: event::Source`: This is the source of events that the /// `Poll` instance should monitor for readiness state changes. /// /// `token: Token`: The caller picks a token to associate with the socket. /// When [`poll`] returns an event for the handle, this token is included. /// This allows the caller to map the event to its source. The token /// associated with the `event::Source` can be changed at any time by /// calling [`reregister`]. /// /// See documentation on [`Token`] for an example showing how to pick /// [`Token`] values. /// /// `interest: Interest`: Specifies which operations `Poll` should monitor /// for readiness. `Poll` will only return readiness events for operations /// specified by this argument. /// /// If a socket is registered with readable interest and the socket becomes /// writable, no event will be returned from [`poll`]. /// /// The readiness interest for an `event::Source` can be changed at any time /// by calling [`reregister`]. /// /// # Notes /// /// Callers must ensure that if a source being registered with a `Poll` /// instance was previously registered with that `Poll` instance, then a /// call to [`deregister`] has already occurred. Consecutive calls to /// `register` is undefined behavior. /// /// Unless otherwise specified, the caller should assume that once an event /// source is registered with a `Poll` instance, it is bound to that `Poll` /// instance for the lifetime of the event source. This remains true even /// if the event source is deregistered from the poll instance using /// [`deregister`]. /// /// [`event::Source`]: ./event/trait.Source.html /// [`poll`]: struct.Poll.html#method.poll /// [`reregister`]: struct.Registry.html#method.reregister /// [`deregister`]: struct.Registry.html#method.deregister /// [`Token`]: struct.Token.html /// /// # Examples /// #[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")] #[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")] /// # use std::error::Error; /// # use std::net; /// # fn main() -> Result<(), Box> { /// use mio::{Events, Poll, Interest, Token}; /// use mio::net::TcpStream; /// use std::net::SocketAddr; /// use std::time::{Duration, Instant}; /// /// let mut poll = Poll::new()?; /// /// let address: SocketAddr = "127.0.0.1:0".parse()?; /// let listener = net::TcpListener::bind(address)?; /// let mut socket = TcpStream::connect(listener.local_addr()?)?; /// /// // Register the socket with `poll` /// poll.registry().register( /// &mut socket, /// Token(0), /// Interest::READABLE | Interest::WRITABLE)?; /// /// let mut events = Events::with_capacity(1024); /// let start = Instant::now(); /// let timeout = Duration::from_millis(500); /// /// loop { /// let elapsed = start.elapsed(); /// /// if elapsed >= timeout { /// // Connection timed out /// return Ok(()); /// } /// /// let remaining = timeout - elapsed; /// poll.poll(&mut events, Some(remaining))?; /// /// for event in &events { /// if event.token() == Token(0) { /// // Something (probably) happened on the socket. /// return Ok(()); /// } /// } /// } /// # } /// ``` pub fn register(&self, source: &mut S, token: Token, interests: Interest) -> io::Result<()> where S: event::Source + ?Sized, { trace!( "registering event source with poller: token={:?}, interests={:?}", token, interests ); source.register(self, token, interests) } /// Re-register an [`event::Source`] with the `Poll` instance. /// /// Re-registering an event source allows changing the details of the /// registration. Specifically, it allows updating the associated `token` /// and `interests` specified in previous `register` and `reregister` calls. /// /// The `reregister` arguments fully override the previous values. In other /// words, if a socket is registered with [`readable`] interest and the call /// to `reregister` specifies [`writable`], then read interest is no longer /// requested for the handle. /// /// The event source must have previously been registered with this instance /// of `Poll`, otherwise the behavior is undefined. /// /// See the [`register`] documentation for details about the function /// arguments and see the [`struct`] docs for a high level overview of /// polling. /// /// # Examples /// #[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")] #[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")] /// # use std::error::Error; /// # use std::net; /// # fn main() -> Result<(), Box> { /// use mio::{Poll, Interest, Token}; /// use mio::net::TcpStream; /// use std::net::SocketAddr; /// /// let poll = Poll::new()?; /// /// let address: SocketAddr = "127.0.0.1:0".parse()?; /// let listener = net::TcpListener::bind(address)?; /// let mut socket = TcpStream::connect(listener.local_addr()?)?; /// /// // Register the socket with `poll`, requesting readable /// poll.registry().register( /// &mut socket, /// Token(0), /// Interest::READABLE)?; /// /// // Reregister the socket specifying write interest instead. Even though /// // the token is the same it must be specified. /// poll.registry().reregister( /// &mut socket, /// Token(0), /// Interest::WRITABLE)?; /// # Ok(()) /// # } /// ``` /// /// [`event::Source`]: ./event/trait.Source.html /// [`struct`]: struct.Poll.html /// [`register`]: struct.Registry.html#method.register /// [`readable`]: ./event/struct.Event.html#is_readable /// [`writable`]: ./event/struct.Event.html#is_writable pub fn reregister(&self, source: &mut S, token: Token, interests: Interest) -> io::Result<()> where S: event::Source + ?Sized, { trace!( "reregistering event source with poller: token={:?}, interests={:?}", token, interests ); source.reregister(self, token, interests) } /// Deregister an [`event::Source`] with the `Poll` instance. /// /// When an event source is deregistered, the `Poll` instance will no longer /// monitor it for readiness state changes. Deregistering clears up any /// internal resources needed to track the handle. After an explicit call /// to this method completes, it is guaranteed that the token previously /// registered to this handle will not be returned by a future poll, so long /// as a happens-before relationship is established between this call and /// the poll. /// /// The event source must have previously been registered with this instance /// of `Poll`, otherwise the behavior is undefined. /// /// A handle can be passed back to `register` after it has been /// deregistered; however, it must be passed back to the **same** `Poll` /// instance, otherwise the behavior is undefined. /// /// # Examples /// #[cfg_attr(all(feature = "os-poll", features = "net"), doc = "```")] #[cfg_attr(not(all(feature = "os-poll", features = "net")), doc = "```ignore")] /// # use std::error::Error; /// # use std::net; /// # fn main() -> Result<(), Box> { /// use mio::{Events, Poll, Interest, Token}; /// use mio::net::TcpStream; /// use std::net::SocketAddr; /// use std::time::Duration; /// /// let mut poll = Poll::new()?; /// /// let address: SocketAddr = "127.0.0.1:0".parse()?; /// let listener = net::TcpListener::bind(address)?; /// let mut socket = TcpStream::connect(listener.local_addr()?)?; /// /// // Register the socket with `poll` /// poll.registry().register( /// &mut socket, /// Token(0), /// Interest::READABLE)?; /// /// poll.registry().deregister(&mut socket)?; /// /// let mut events = Events::with_capacity(1024); /// /// // Set a timeout because this poll should never receive any events. /// poll.poll(&mut events, Some(Duration::from_secs(1)))?; /// assert!(events.is_empty()); /// # Ok(()) /// # } /// ``` pub fn deregister(&self, source: &mut S) -> io::Result<()> where S: event::Source + ?Sized, { trace!("deregistering event source from poller"); source.deregister(self) } /// Creates a new independently owned `Registry`. /// /// Event sources registered with this `Registry` will be registered with /// the original `Registry` and `Poll` instance. pub fn try_clone(&self) -> io::Result { self.selector .try_clone() .map(|selector| Registry { selector }) } /// Internal check to ensure only a single `Waker` is active per [`Poll`] /// instance. #[cfg(debug_assertions)] pub(crate) fn register_waker(&self) { if self.selector.register_waker() { panic!("Only a single `Waker` can be active per `Poll` instance"); } } } impl fmt::Debug for Registry { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Registry").finish() } } #[cfg(unix)] impl AsRawFd for Registry { fn as_raw_fd(&self) -> RawFd { self.selector.as_raw_fd() } } /// Get access to the `sys::Selector` from `Registry`. pub(crate) fn selector(registry: &Registry) -> &sys::Selector { ®istry.selector } cfg_os_poll! { #[cfg(unix)] #[test] pub fn as_raw_fd() { let poll = Poll::new().unwrap(); assert!(poll.as_raw_fd() > 0); } }