1 use crate::io::interest::Interest; 2 use crate::runtime::io::Registration; 3 use crate::runtime::scheduler; 4 5 use mio::event::Source; 6 use std::fmt; 7 use std::io; 8 use std::ops::Deref; 9 use std::panic::{RefUnwindSafe, UnwindSafe}; 10 11 cfg_io_driver! { 12 /// Associates an I/O resource that implements the [`std::io::Read`] and/or 13 /// [`std::io::Write`] traits with the reactor that drives it. 14 /// 15 /// `PollEvented` uses [`Registration`] internally to take a type that 16 /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or 17 /// [`std::io::Write`] and associate it with a reactor that will drive it. 18 /// 19 /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be 20 /// used from within the future's execution model. As such, the 21 /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`] 22 /// implementations using the underlying I/O resource as well as readiness 23 /// events provided by the reactor. 24 /// 25 /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is 26 /// `Sync`), the caller must ensure that there are at most two tasks that 27 /// use a `PollEvented` instance concurrently. One for reading and one for 28 /// writing. While violating this requirement is "safe" from a Rust memory 29 /// model point of view, it will result in unexpected behavior in the form 30 /// of lost notifications and tasks hanging. 31 /// 32 /// ## Readiness events 33 /// 34 /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations, 35 /// this type also supports access to the underlying readiness event stream. 36 /// While similar in function to what [`Registration`] provides, the 37 /// semantics are a bit different. 38 /// 39 /// Two functions are provided to access the readiness events: 40 /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the 41 /// current readiness state of the `PollEvented` instance. If 42 /// [`poll_read_ready`] indicates read readiness, immediately calling 43 /// [`poll_read_ready`] again will also indicate read readiness. 44 /// 45 /// When the operation is attempted and is unable to succeed due to the I/O 46 /// resource not being ready, the caller must call [`clear_readiness`]. 47 /// This clears the readiness state until a new readiness event is received. 48 /// 49 /// This allows the caller to implement additional functions. For example, 50 /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and 51 /// [`clear_readiness`]. 52 /// 53 /// ## Platform-specific events 54 /// 55 /// `PollEvented` also allows receiving platform-specific `mio::Ready` events. 56 /// These events are included as part of the read readiness event stream. The 57 /// write readiness event stream is only for `Ready::writable()` events. 58 /// 59 /// [`AsyncRead`]: crate::io::AsyncRead 60 /// [`AsyncWrite`]: crate::io::AsyncWrite 61 /// [`TcpListener`]: crate::net::TcpListener 62 /// [`clear_readiness`]: Registration::clear_readiness 63 /// [`poll_read_ready`]: Registration::poll_read_ready 64 /// [`poll_write_ready`]: Registration::poll_write_ready 65 pub(crate) struct PollEvented<E: Source> { 66 io: Option<E>, 67 registration: Registration, 68 } 69 } 70 71 // ===== impl PollEvented ===== 72 73 impl<E: Source> PollEvented<E> { 74 /// Creates a new `PollEvented` associated with the default reactor. 75 /// 76 /// The returned `PollEvented` has readable and writable interests. For more control, use 77 /// [`Self::new_with_interest`]. 78 /// 79 /// # Panics 80 /// 81 /// This function panics if thread-local runtime is not set. 82 /// 83 /// The runtime is usually set implicitly when this function is called 84 /// from a future driven by a tokio runtime, otherwise runtime can be set 85 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. 86 #[track_caller] 87 #[cfg_attr(feature = "signal", allow(unused))] new(io: E) -> io::Result<Self>88 pub(crate) fn new(io: E) -> io::Result<Self> { 89 PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE) 90 } 91 92 /// Creates a new `PollEvented` associated with the default reactor, for 93 /// specific `Interest` state. `new_with_interest` should be used over `new` 94 /// when you need control over the readiness state, such as when a file 95 /// descriptor only allows reads. This does not add `hup` or `error` so if 96 /// you are interested in those states, you will need to add them to the 97 /// readiness state passed to this function. 98 /// 99 /// # Panics 100 /// 101 /// This function panics if thread-local runtime is not set. 102 /// 103 /// The runtime is usually set implicitly when this function is called from 104 /// a future driven by a tokio runtime, otherwise runtime can be set 105 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) 106 /// function. 107 #[track_caller] 108 #[cfg_attr(feature = "signal", allow(unused))] new_with_interest(io: E, interest: Interest) -> io::Result<Self>109 pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { 110 Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current()) 111 } 112 113 #[track_caller] new_with_interest_and_handle( mut io: E, interest: Interest, handle: scheduler::Handle, ) -> io::Result<Self>114 pub(crate) fn new_with_interest_and_handle( 115 mut io: E, 116 interest: Interest, 117 handle: scheduler::Handle, 118 ) -> io::Result<Self> { 119 let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; 120 Ok(Self { 121 io: Some(io), 122 registration, 123 }) 124 } 125 126 /// Returns a reference to the registration. 127 #[cfg(feature = "net")] registration(&self) -> &Registration128 pub(crate) fn registration(&self) -> &Registration { 129 &self.registration 130 } 131 132 /// Deregisters the inner io from the registration and returns a Result containing the inner io. 133 #[cfg(any(feature = "net", feature = "process"))] into_inner(mut self) -> io::Result<E>134 pub(crate) fn into_inner(mut self) -> io::Result<E> { 135 let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. 136 self.registration.deregister(&mut inner)?; 137 Ok(inner) 138 } 139 } 140 141 feature! { 142 #![any(feature = "net", all(unix, feature = "process"))] 143 144 use crate::io::ReadBuf; 145 use std::task::{Context, Poll}; 146 147 impl<E: Source> PollEvented<E> { 148 // Safety: The caller must ensure that `E` can read into uninitialized memory 149 pub(crate) unsafe fn poll_read<'a>( 150 &'a self, 151 cx: &mut Context<'_>, 152 buf: &mut ReadBuf<'_>, 153 ) -> Poll<io::Result<()>> 154 where 155 &'a E: io::Read + 'a, 156 { 157 use std::io::Read; 158 159 loop { 160 let evt = ready!(self.registration.poll_read_ready(cx))?; 161 162 let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); 163 let len = b.len(); 164 165 match self.io.as_ref().unwrap().read(b) { 166 Ok(n) => { 167 // if we read a partially full buffer, this is sufficient on unix to show 168 // that the socket buffer has been drained. Unfortunately this assumption 169 // fails for level-triggered selectors (like on Windows or poll even for 170 // UNIX): https://github.com/tokio-rs/tokio/issues/5866 171 if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < len) { 172 self.registration.clear_readiness(evt); 173 } 174 175 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the 176 // buffer. 177 buf.assume_init(n); 178 buf.advance(n); 179 return Poll::Ready(Ok(())); 180 }, 181 Err(e) if e.kind() == io::ErrorKind::WouldBlock => { 182 self.registration.clear_readiness(evt); 183 } 184 Err(e) => return Poll::Ready(Err(e)), 185 } 186 } 187 } 188 189 pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> 190 where 191 &'a E: io::Write + 'a, 192 { 193 use std::io::Write; 194 195 loop { 196 let evt = ready!(self.registration.poll_write_ready(cx))?; 197 198 match self.io.as_ref().unwrap().write(buf) { 199 Ok(n) => { 200 // if we write only part of our buffer, this is sufficient on unix to show 201 // that the socket buffer is full. Unfortunately this assumption 202 // fails for level-triggered selectors (like on Windows or poll even for 203 // UNIX): https://github.com/tokio-rs/tokio/issues/5866 204 if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < buf.len()) { 205 self.registration.clear_readiness(evt); 206 } 207 208 return Poll::Ready(Ok(n)); 209 }, 210 Err(e) if e.kind() == io::ErrorKind::WouldBlock => { 211 self.registration.clear_readiness(evt); 212 } 213 Err(e) => return Poll::Ready(Err(e)), 214 } 215 } 216 } 217 218 #[cfg(any(feature = "net", feature = "process"))] 219 pub(crate) fn poll_write_vectored<'a>( 220 &'a self, 221 cx: &mut Context<'_>, 222 bufs: &[io::IoSlice<'_>], 223 ) -> Poll<io::Result<usize>> 224 where 225 &'a E: io::Write + 'a, 226 { 227 use std::io::Write; 228 self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs)) 229 } 230 } 231 } 232 233 impl<E: Source> UnwindSafe for PollEvented<E> {} 234 235 impl<E: Source> RefUnwindSafe for PollEvented<E> {} 236 237 impl<E: Source> Deref for PollEvented<E> { 238 type Target = E; 239 deref(&self) -> &E240 fn deref(&self) -> &E { 241 self.io.as_ref().unwrap() 242 } 243 } 244 245 impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result246 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 247 f.debug_struct("PollEvented").field("io", &self.io).finish() 248 } 249 } 250 251 impl<E: Source> Drop for PollEvented<E> { drop(&mut self)252 fn drop(&mut self) { 253 if let Some(mut io) = self.io.take() { 254 // Ignore errors 255 let _ = self.registration.deregister(&mut io); 256 } 257 } 258 } 259