• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::io::driver::{Handle, Interest, Registration};
2 
3 use mio::event::Source;
4 use std::fmt;
5 use std::io;
6 use std::ops::Deref;
7 
8 cfg_io_driver! {
9     /// Associates an I/O resource that implements the [`std::io::Read`] and/or
10     /// [`std::io::Write`] traits with the reactor that drives it.
11     ///
12     /// `PollEvented` uses [`Registration`] internally to take a type that
13     /// implements [`mio::event::Source`] as well as [`std::io::Read`] and or
14     /// [`std::io::Write`] and associate it with a reactor that will drive it.
15     ///
16     /// Once the [`mio::event::Source`] type is wrapped by `PollEvented`, it can be
17     /// used from within the future's execution model. As such, the
18     /// `PollEvented` type provides [`AsyncRead`] and [`AsyncWrite`]
19     /// implementations using the underlying I/O resource as well as readiness
20     /// events provided by the reactor.
21     ///
22     /// **Note**: While `PollEvented` is `Sync` (if the underlying I/O type is
23     /// `Sync`), the caller must ensure that there are at most two tasks that
24     /// use a `PollEvented` instance concurrently. One for reading and one for
25     /// writing. While violating this requirement is "safe" from a Rust memory
26     /// model point of view, it will result in unexpected behavior in the form
27     /// of lost notifications and tasks hanging.
28     ///
29     /// ## Readiness events
30     ///
31     /// Besides just providing [`AsyncRead`] and [`AsyncWrite`] implementations,
32     /// this type also supports access to the underlying readiness event stream.
33     /// While similar in function to what [`Registration`] provides, the
34     /// semantics are a bit different.
35     ///
36     /// Two functions are provided to access the readiness events:
37     /// [`poll_read_ready`] and [`poll_write_ready`]. These functions return the
38     /// current readiness state of the `PollEvented` instance. If
39     /// [`poll_read_ready`] indicates read readiness, immediately calling
40     /// [`poll_read_ready`] again will also indicate read readiness.
41     ///
42     /// When the operation is attempted and is unable to succeed due to the I/O
43     /// resource not being ready, the caller must call `clear_readiness`.
44     /// This clears the readiness state until a new readiness event is received.
45     ///
46     /// This allows the caller to implement additional functions. For example,
47     /// [`TcpListener`] implements poll_accept by using [`poll_read_ready`] and
48     /// `clear_read_ready`.
49     ///
50     /// ## Platform-specific events
51     ///
52     /// `PollEvented` also allows receiving platform-specific `mio::Ready` events.
53     /// These events are included as part of the read readiness event stream. The
54     /// write readiness event stream is only for `Ready::writable()` events.
55     ///
56     /// [`AsyncRead`]: crate::io::AsyncRead
57     /// [`AsyncWrite`]: crate::io::AsyncWrite
58     /// [`TcpListener`]: crate::net::TcpListener
59     /// [`poll_read_ready`]: Registration::poll_read_ready
60     /// [`poll_write_ready`]: Registration::poll_write_ready
61     pub(crate) struct PollEvented<E: Source> {
62         io: Option<E>,
63         registration: Registration,
64     }
65 }
66 
67 // ===== impl PollEvented =====
68 
69 impl<E: Source> PollEvented<E> {
70     /// Creates a new `PollEvented` associated with the default reactor.
71     ///
72     /// # Panics
73     ///
74     /// This function panics if thread-local runtime is not set.
75     ///
76     /// The runtime is usually set implicitly when this function is called
77     /// from a future driven by a tokio runtime, otherwise runtime can be set
78     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
79     #[cfg_attr(feature = "signal", allow(unused))]
new(io: E) -> io::Result<Self>80     pub(crate) fn new(io: E) -> io::Result<Self> {
81         PollEvented::new_with_interest(io, Interest::READABLE | Interest::WRITABLE)
82     }
83 
84     /// Creates a new `PollEvented` associated with the default reactor, for
85     /// specific `Interest` state. `new_with_interest` should be used over `new`
86     /// when you need control over the readiness state, such as when a file
87     /// descriptor only allows reads. This does not add `hup` or `error` so if
88     /// you are interested in those states, you will need to add them to the
89     /// readiness state passed to this function.
90     ///
91     /// # Panics
92     ///
93     /// This function panics if thread-local runtime is not set.
94     ///
95     /// The runtime is usually set implicitly when this function is called from
96     /// a future driven by a tokio runtime, otherwise runtime can be set
97     /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter)
98     /// function.
99     #[cfg_attr(feature = "signal", allow(unused))]
new_with_interest(io: E, interest: Interest) -> io::Result<Self>100     pub(crate) fn new_with_interest(io: E, interest: Interest) -> io::Result<Self> {
101         Self::new_with_interest_and_handle(io, interest, Handle::current())
102     }
103 
new_with_interest_and_handle( mut io: E, interest: Interest, handle: Handle, ) -> io::Result<Self>104     pub(crate) fn new_with_interest_and_handle(
105         mut io: E,
106         interest: Interest,
107         handle: Handle,
108     ) -> io::Result<Self> {
109         let registration = Registration::new_with_interest_and_handle(&mut io, interest, handle)?;
110         Ok(Self {
111             io: Some(io),
112             registration,
113         })
114     }
115 
116     /// Returns a reference to the registration.
117     #[cfg(any(
118         feature = "net",
119         all(unix, feature = "process"),
120         all(unix, feature = "signal"),
121     ))]
registration(&self) -> &Registration122     pub(crate) fn registration(&self) -> &Registration {
123         &self.registration
124     }
125 
126     /// Deregisters the inner io from the registration and returns a Result containing the inner io.
127     #[cfg(any(feature = "net", feature = "process"))]
into_inner(mut self) -> io::Result<E>128     pub(crate) fn into_inner(mut self) -> io::Result<E> {
129         let mut inner = self.io.take().unwrap(); // As io shouldn't ever be None, just unwrap here.
130         self.registration.deregister(&mut inner)?;
131         Ok(inner)
132     }
133 }
134 
135 feature! {
136     #![any(feature = "net", feature = "process")]
137 
138     use crate::io::ReadBuf;
139     use std::task::{Context, Poll};
140 
141     impl<E: Source> PollEvented<E> {
142         // Safety: The caller must ensure that `E` can read into uninitialized memory
143         pub(crate) unsafe fn poll_read<'a>(
144             &'a self,
145             cx: &mut Context<'_>,
146             buf: &mut ReadBuf<'_>,
147         ) -> Poll<io::Result<()>>
148         where
149             &'a E: io::Read + 'a,
150         {
151             use std::io::Read;
152 
153             let n = ready!(self.registration.poll_read_io(cx, || {
154                 let b = &mut *(buf.unfilled_mut() as *mut [std::mem::MaybeUninit<u8>] as *mut [u8]);
155                 self.io.as_ref().unwrap().read(b)
156             }))?;
157 
158             // Safety: We trust `TcpStream::read` to have filled up `n` bytes in the
159             // buffer.
160             buf.assume_init(n);
161             buf.advance(n);
162             Poll::Ready(Ok(()))
163         }
164 
165         pub(crate) fn poll_write<'a>(&'a self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>>
166         where
167             &'a E: io::Write + 'a,
168         {
169             use std::io::Write;
170             self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf))
171         }
172 
173         #[cfg(feature = "net")]
174         pub(crate) fn poll_write_vectored<'a>(
175             &'a self,
176             cx: &mut Context<'_>,
177             bufs: &[io::IoSlice<'_>],
178         ) -> Poll<io::Result<usize>>
179         where
180             &'a E: io::Write + 'a,
181         {
182             use std::io::Write;
183             self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs))
184         }
185     }
186 }
187 
188 impl<E: Source> Deref for PollEvented<E> {
189     type Target = E;
190 
deref(&self) -> &E191     fn deref(&self) -> &E {
192         self.io.as_ref().unwrap()
193     }
194 }
195 
196 impl<E: Source + fmt::Debug> fmt::Debug for PollEvented<E> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result197     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198         f.debug_struct("PollEvented").field("io", &self.io).finish()
199     }
200 }
201 
202 impl<E: Source> Drop for PollEvented<E> {
drop(&mut self)203     fn drop(&mut self) {
204         if let Some(mut io) = self.io.take() {
205             // Ignore errors
206             let _ = self.registration.deregister(&mut io);
207         }
208     }
209 }
210