1 use crate::io::driver::{Handle, Interest, Registration}; 2 3 use mio::event::Source; 4 use std::fmt; 5 use std::io; 6 use std::ops::Deref; 7 8 cfg_io_driver! { 9 /// Associates an I/O resource that implements the [`std::io::Read`] and/or 10 /// [`std::io::Write`] traits with the reactor that drives it. 11 /// 12 /// `PollEvented` uses [`Registration`] internally to take a type that 13 /// implements [`mio::event::Source`] as well as [`std::io::Read`] and or 14 /// [`std::io::Write`] and associate it with a reactor that will drive it. 15 /// 16 /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be 17 /// used from within the future's execution model. As such, the 18 /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`] 19 /// implementations using the underlying I/O resource as well as readiness 20 /// events provided by the reactor. 21 /// 22 /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is 23 /// `Sync`), the caller must ensure that there are at most two tasks that 24 /// use a `PollEvented` instance concurrently. One for reading and one for 25 /// writing. While violating this requirement is "safe" from a Rust memory 26 /// model point of view, it will result in unexpected behavior in the form 27 /// of lost notifications and tasks hanging. 28 /// 29 /// ## Readiness events 30 /// 31 /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations, 32 /// this type also supports access to the underlying readiness event stream. 33 /// While similar in function to what [`Registration`] provides, the 34 /// semantics are a bit different. 35 /// 36 /// Two functions are provided to access the readiness events: 37 /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the 38 /// current readiness state of the `PollEvented` instance. If 39 /// [`poll_read_ready`] indicates read readiness, immediately calling 40 /// [`poll_read_ready`] again will also indicate read readiness. 41 /// 42 /// When the operation is attempted and is unable to succeed due to the I/O 43 /// resource not being ready, the caller must call `clear_readiness`. 44 /// This clears the readiness state until a new readiness event is received. 45 /// 46 /// This allows the caller to implement additional functions. For example, 47 /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and 48 /// `clear_read_ready`. 49 /// 50 /// ## Platform-specific events 51 /// 52 /// `PollEvented` also allows receiving platform-specific `mio::Ready` events. 53 /// These events are included as part of the read readiness event stream. The 54 /// write readiness event stream is only for `Ready::writable()` events. 55 /// 56 /// [`AsyncRead`]: crate::io::AsyncRead 57 /// [`AsyncWrite`]: crate::io::AsyncWrite 58 /// [`TcpListener`]: crate::net::TcpListener 59 /// [`poll_read_ready`]: Registration::poll_read_ready 60 /// [`poll_write_ready`]: Registration::poll_write_ready 61 pub(crate) struct PollEvented<E: Source> { 62 io: Option<E>, 63 registration: Registration, 64 } 65 } 66 67 // ===== impl PollEvented ===== 68 69 impl<E: Source> PollEvented<E> { 70 /// Creates a new `PollEvented` associated with the default reactor. 71 /// 72 /// # Panics 73 /// 74 /// This function panics if thread-local runtime is not set. 75 /// 76 /// The runtime is usually set implicitly when this function is called 77 /// from a future driven by a tokio runtime, otherwise runtime can be set 78 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. 79 #[cfg_attr(feature = "signal", allow(unused))] new(io: E) -> io::Result<Self>80 pub(crate) fn new(io: E) -> io::Result<Self> { 81 PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE) 82 } 83 84 /// Creates a new `PollEvented` associated with the default reactor, for 85 /// specific `Interest` state. `new_with_interest` should be used over `new` 86 /// when you need control over the readiness state, such as when a file 87 /// descriptor only allows reads. This does not add `hup` or `error` so if 88 /// you are interested in those states, you will need to add them to the 89 /// readiness state passed to this function. 90 /// 91 /// # Panics 92 /// 93 /// This function panics if thread-local runtime is not set. 94 /// 95 /// The runtime is usually set implicitly when this function is called from 96 /// a future driven by a tokio runtime, otherwise runtime can be set 97 /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) 98 /// function. 99 #[cfg_attr(feature = "signal", allow(unused))] new_with_interest(io: E, interest: Interest) -> io::Result<Self>100 pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> { 101 Self::new_with_interest_and_handle(io, interest, Handle::current()) 102 } 103 new_with_interest_and_handle( mut io: E, interest: Interest, handle: Handle, ) -> io::Result<Self>104 pub(crate) fn new_with_interest_and_handle( 105 mut io: E, 106 interest: Interest, 107 handle: Handle, 108 ) -> io::Result<Self> { 109 let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?; 110 Ok(Self { 111 io: Some(io), 112 registration, 113 }) 114 } 115 116 /// Returns a reference to the registration. 117 #[cfg(any( 118 feature = "net", 119 all(unix, feature = "process"), 120 all(unix, feature = "signal"), 121 ))] registration(&self) -> &Registration122 pub(crate) fn registration(&self) -> &Registration { 123 &self.registration 124 } 125 126 /// Deregisters the inner io from the registration and returns a Result containing the inner io. 127 #[cfg(any(feature = "net", feature = "process"))] into_inner(mut self) -> io::Result<E>128 pub(crate) fn into_inner(mut self) -> io::Result<E> { 129 let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here. 130 self.registration.deregister(&mut inner)?; 131 Ok(inner) 132 } 133 } 134 135 feature! { 136 #![any(feature = "net", feature = "process")] 137 138 use crate::io::ReadBuf; 139 use std::task::{Context, Poll}; 140 141 impl<E: Source> PollEvented<E> { 142 // Safety: The caller must ensure that `E` can read into uninitialized memory 143 pub(crate) unsafe fn poll_read<'a>( 144 &'a self, 145 cx: &mut Context<'_>, 146 buf: &mut ReadBuf<'_>, 147 ) -> Poll<io::Result<()>> 148 where 149 &'a E: io::Read + 'a, 150 { 151 use std::io::Read; 152 153 let n = ready!(self.registration.poll_read_io(cx, || { 154 let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]); 155 self.io.as_ref().unwrap().read(b) 156 }))?; 157 158 // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the 159 // buffer. 160 buf.assume_init(n); 161 buf.advance(n); 162 Poll::Ready(Ok(())) 163 } 164 165 pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> 166 where 167 &'a E: io::Write + 'a, 168 { 169 use std::io::Write; 170 self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf)) 171 } 172 173 #[cfg(feature = "net")] 174 pub(crate) fn poll_write_vectored<'a>( 175 &'a self, 176 cx: &mut Context<'_>, 177 bufs: &[io::IoSlice<'_>], 178 ) -> Poll<io::Result<usize>> 179 where 180 &'a E: io::Write + 'a, 181 { 182 use std::io::Write; 183 self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs)) 184 } 185 } 186 } 187 188 impl<E: Source> Deref for PollEvented<E> { 189 type Target = E; 190 deref(&self) -> &E191 fn deref(&self) -> &E { 192 self.io.as_ref().unwrap() 193 } 194 } 195 196 impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 198 f.debug_struct("PollEvented").field("io", &self.io).finish() 199 } 200 } 201 202 impl<E: Source> Drop for PollEvented<E> { drop(&mut self)203 fn drop(&mut self) { 204 if let Some(mut io) = self.io.take() { 205 // Ignore errors 206 let _ = self.registration.deregister(&mut io); 207 } 208 } 209 } 210