• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::io::interest::Interest;
2 use crate::runtime::io::Registration;
3 use crate::runtime::scheduler;
4 
5 use mio::event::Source;
6 use std::fmt;
7 use std::io;
8 use std::ops::Deref;
9 use std::panic::{RefUnwindSafe, UnwindSafe};
10 
11 cfg_io_driver! {
12     /// Associates an I/O resource that implements the [`std::io::Read`] and/or
13     /// [`std::io::Write`] traits with the reactor that drives it.
14     ///
15     /// `PollEvented` uses [`Registration`] internally to take a type that
16     /// implements [`mio::event::Source`] as well as [`std::io::Read`] and/or
17     /// [`std::io::Write`] and associate it with a reactor that will drive it.
18     ///
19     /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be
20     /// used from within the future's execution model. As such, the
21     /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
22     /// implementations using the underlying I/O resource as well as readiness
23     /// events provided by the reactor.
24     ///
25     /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is
26     /// `Sync`), the caller must ensure that there are at most two tasks that
27     /// use a `PollEvented` instance concurrently. One for reading and one for
28     /// writing. While violating this requirement is "safe" from a Rust memory
29     /// model point of view, it will result in unexpected behavior in the form
30     /// of lost notifications and tasks hanging.
31     ///
32     /// ## Readiness events
33     ///
34     /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations,
35     /// this type also supports access to the underlying readiness event stream.
36     /// While similar in function to what [`Registration`] provides, the
37     /// semantics are a bit different.
38     ///
39     /// Two functions are provided to access the readiness events:
40     /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the
41     /// current readiness state of the `PollEvented` instance. If
42     /// [`poll_read_ready`] indicates read readiness, immediately calling
43     /// [`poll_read_ready`] again will also indicate read readiness.
44     ///
45     /// When the operation is attempted and is unable to succeed due to the I/O
46     /// resource not being ready, the caller must call [`clear_readiness`].
47     /// This clears the readiness state until a new readiness event is received.
48     ///
49     /// This allows the caller to implement additional functions. For example,
50     /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
51     /// [`clear_readiness`].
52     ///
53     /// ## Platform-specific events
54     ///
55     /// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
56     /// These events are included as part of the read readiness event stream. The
57     /// write readiness event stream is only for `Ready::writable()` events.
58     ///
59     /// [`AsyncRead`]: crate::io::AsyncRead
60     /// [`AsyncWrite`]: crate::io::AsyncWrite
61     /// [`TcpListener`]: crate::net::TcpListener
62     /// [`clear_readiness`]: Registration::clear_readiness
63     /// [`poll_read_ready`]: Registration::poll_read_ready
64     /// [`poll_write_ready`]: Registration::poll_write_ready
65     pub(crate) struct PollEvented<E: Source> {
66         io: Option<E>,
67         registration: Registration,
68     }
69 }
70 
71 // ===== impl PollEvented =====
72 
73 impl<E: Source> PollEvented<E> {
74     /// Creates a new `PollEvented` associated with the default reactor.
75     ///
76     /// The returned `PollEvented` has readable and writable interests. For more control, use
77     /// [`Self::new_with_interest`].
78     ///
79     /// # Panics
80     ///
81     /// This function panics if thread-local runtime is not set.
82     ///
83     /// The runtime is usually set implicitly when this function is called
84     /// from a future driven by a tokio runtime, otherwise runtime can be set
85     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
86     #[track_caller]
87     #[cfg_attr(feature = "signal", allow(unused))]
new(io: E) -> io::Result<Self>88     pub(crate) fn new(io: E) -> io::Result<Self> {
89         PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE)
90     }
91 
92     /// Creates a new `PollEvented` associated with the default reactor, for
93     /// specific `Interest` state. `new_with_interest` should be used over `new`
94     /// when you need control over the readiness state, such as when a file
95     /// descriptor only allows reads. This does not add `hup` or `error` so if
96     /// you are interested in those states, you will need to add them to the
97     /// readiness state passed to this function.
98     ///
99     /// # Panics
100     ///
101     /// This function panics if thread-local runtime is not set.
102     ///
103     /// The runtime is usually set implicitly when this function is called from
104     /// a future driven by a tokio runtime, otherwise runtime can be set
105     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter)
106     /// function.
107     #[track_caller]
108     #[cfg_attr(feature = "signal", allow(unused))]
new_with_interest(io: E, interest: Interest) -> io::Result<Self>109     pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
110         Self::new_with_interest_and_handle(io, interest, scheduler::Handle::current())
111     }
112 
113     #[track_caller]
new_with_interest_and_handle( mut io: E, interest: Interest, handle: scheduler::Handle, ) -> io::Result<Self>114     pub(crate) fn new_with_interest_and_handle(
115         mut io: E,
116         interest: Interest,
117         handle: scheduler::Handle,
118     ) -> io::Result<Self> {
119         let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
120         Ok(Self {
121             io: Some(io),
122             registration,
123         })
124     }
125 
126     /// Returns a reference to the registration.
127     #[cfg(feature = "net")]
registration(&self) -> &Registration128     pub(crate) fn registration(&self) -> &Registration {
129         &self.registration
130     }
131 
132     /// Deregisters the inner io from the registration and returns a Result containing the inner io.
133     #[cfg(any(feature = "net", feature = "process"))]
into_inner(mut self) -> io::Result<E>134     pub(crate) fn into_inner(mut self) -> io::Result<E> {
135         let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
136         self.registration.deregister(&mut inner)?;
137         Ok(inner)
138     }
139 }
140 
141 feature! {
142     #![any(feature = "net", all(unix, feature = "process"))]
143 
144     use crate::io::ReadBuf;
145     use std::task::{Context, Poll};
146 
147     impl<E: Source> PollEvented<E> {
148         // Safety: The caller must ensure that `E` can read into uninitialized memory
149         pub(crate) unsafe fn poll_read<'a>(
150             &'a self,
151             cx: &mut Context<'_>,
152             buf: &mut ReadBuf<'_>,
153         ) -> Poll<io::Result<()>>
154         where
155             &'a E: io::Read + 'a,
156         {
157             use std::io::Read;
158 
159             loop {
160                 let evt = ready!(self.registration.poll_read_ready(cx))?;
161 
162                 let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
163                 let len = b.len();
164 
165                 match self.io.as_ref().unwrap().read(b) {
166                     Ok(n) => {
167                         // if we read a partially full buffer, this is sufficient on unix to show
168                         // that the socket buffer has been drained.  Unfortunately this assumption
169                         // fails for level-triggered selectors (like on Windows or poll even for
170                         // UNIX): https://github.com/tokio-rs/tokio/issues/5866
171                         if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < len) {
172                             self.registration.clear_readiness(evt);
173                         }
174 
175                         // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
176                         // buffer.
177                         buf.assume_init(n);
178                         buf.advance(n);
179                         return Poll::Ready(Ok(()));
180                     },
181                     Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
182                         self.registration.clear_readiness(evt);
183                     }
184                     Err(e) => return Poll::Ready(Err(e)),
185                 }
186             }
187         }
188 
189         pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
190         where
191             &'a E: io::Write + 'a,
192         {
193             use std::io::Write;
194 
195             loop {
196                 let evt = ready!(self.registration.poll_write_ready(cx))?;
197 
198                 match self.io.as_ref().unwrap().write(buf) {
199                     Ok(n) => {
200                         // if we write only part of our buffer, this is sufficient on unix to show
201                         // that the socket buffer is full.  Unfortunately this assumption
202                         // fails for level-triggered selectors (like on Windows or poll even for
203                         // UNIX): https://github.com/tokio-rs/tokio/issues/5866
204                         if n > 0 && (!cfg!(windows) && !cfg!(mio_unsupported_force_poll_poll) && n < buf.len()) {
205                             self.registration.clear_readiness(evt);
206                         }
207 
208                         return Poll::Ready(Ok(n));
209                     },
210                     Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
211                         self.registration.clear_readiness(evt);
212                     }
213                     Err(e) => return Poll::Ready(Err(e)),
214                 }
215             }
216         }
217 
218         #[cfg(any(feature = "net", feature = "process"))]
219         pub(crate) fn poll_write_vectored<'a>(
220             &'a self,
221             cx: &mut Context<'_>,
222             bufs: &[io::IoSlice<'_>],
223         ) -> Poll<io::Result<usize>>
224         where
225             &'a E: io::Write + 'a,
226         {
227             use std::io::Write;
228             self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs))
229         }
230     }
231 }
232 
233 impl<E: Source> UnwindSafe for PollEvented<E> {}
234 
235 impl<E: Source> RefUnwindSafe for PollEvented<E> {}
236 
237 impl<E: Source> Deref for PollEvented<E> {
238     type Target = E;
239 
deref(&self) -> &E240     fn deref(&self) -> &E {
241         self.io.as_ref().unwrap()
242     }
243 }
244 
245 impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result246     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
247         f.debug_struct("PollEvented").field("io", &self.io).finish()
248     }
249 }
250 
251 impl<E: Source> Drop for PollEvented<E> {
drop(&mut self)252     fn drop(&mut self) {
253         if let Some(mut io) = self.io.take() {
254             // Ignore errors
255             let _ = self.registration.deregister(&mut io);
256         }
257     }
258 }
259