• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::io::{Interest, Ready};
2 use crate::runtime::io::{ReadyEvent, Registration};
3 use crate::runtime::scheduler;
4 
5 use mio::unix::SourceFd;
6 use std::io;
7 use std::os::unix::io::{AsRawFd, RawFd};
8 use std::{task::Context, task::Poll};
9 
10 /// Associates an IO object backed by a Unix file descriptor with the tokio
11 /// reactor, allowing for readiness to be polled. The file descriptor must be of
12 /// a type that can be used with the OS polling facilities (ie, `poll`, `epoll`,
13 /// `kqueue`, etc), such as a network socket or pipe, and the file descriptor
14 /// must have the nonblocking mode set to true.
15 ///
16 /// Creating an AsyncFd registers the file descriptor with the current tokio
17 /// Reactor, allowing you to directly await the file descriptor being readable
18 /// or writable. Once registered, the file descriptor remains registered until
19 /// the AsyncFd is dropped.
20 ///
21 /// The AsyncFd takes ownership of an arbitrary object to represent the IO
22 /// object. It is intended that this object will handle closing the file
23 /// descriptor when it is dropped, avoiding resource leaks and ensuring that the
24 /// AsyncFd can clean up the registration before closing the file descriptor.
25 /// The [`AsyncFd::into_inner`] function can be used to extract the inner object
26 /// to retake control from the tokio IO reactor.
27 ///
28 /// The inner object is required to implement [`AsRawFd`]. This file descriptor
29 /// must not change while [`AsyncFd`] owns the inner object, i.e. the
30 /// [`AsRawFd::as_raw_fd`] method on the inner type must always return the same
31 /// file descriptor when called multiple times. Failure to uphold this results
32 /// in unspecified behavior in the IO driver, which may include breaking
33 /// notifications for other sockets/etc.
34 ///
35 /// Polling for readiness is done by calling the async functions [`readable`]
36 /// and [`writable`]. These functions complete when the associated readiness
37 /// condition is observed. Any number of tasks can query the same `AsyncFd` in
38 /// parallel, on the same or different conditions.
39 ///
40 /// On some platforms, the readiness detecting mechanism relies on
41 /// edge-triggered notifications. This means that the OS will only notify Tokio
42 /// when the file descriptor transitions from not-ready to ready. For this to
43 /// work you should first try to read or write and only poll for readiness
44 /// if that fails with an error of [`std::io::ErrorKind::WouldBlock`].
45 ///
46 /// Tokio internally tracks when it has received a ready notification, and when
47 /// readiness checking functions like [`readable`] and [`writable`] are called,
48 /// if the readiness flag is set, these async functions will complete
49 /// immediately. This however does mean that it is critical to ensure that this
50 /// ready flag is cleared when (and only when) the file descriptor ceases to be
51 /// ready. The [`AsyncFdReadyGuard`] returned from readiness checking functions
52 /// serves this function; after calling a readiness-checking async function,
53 /// you must use this [`AsyncFdReadyGuard`] to signal to tokio whether the file
54 /// descriptor is no longer in a ready state.
55 ///
56 /// ## Use with to a poll-based API
57 ///
58 /// In some cases it may be desirable to use `AsyncFd` from APIs similar to
59 /// [`TcpStream::poll_read_ready`]. The [`AsyncFd::poll_read_ready`] and
60 /// [`AsyncFd::poll_write_ready`] functions are provided for this purpose.
61 /// Because these functions don't create a future to hold their state, they have
62 /// the limitation that only one task can wait on each direction (read or write)
63 /// at a time.
64 ///
65 /// # Examples
66 ///
67 /// This example shows how to turn [`std::net::TcpStream`] asynchronous using
68 /// `AsyncFd`.  It implements the read/write operations both as an `async fn`
69 /// and using the IO traits [`AsyncRead`] and [`AsyncWrite`].
70 ///
71 /// ```no_run
72 /// use futures::ready;
73 /// use std::io::{self, Read, Write};
74 /// use std::net::TcpStream;
75 /// use std::pin::Pin;
76 /// use std::task::{Context, Poll};
77 /// use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
78 /// use tokio::io::unix::AsyncFd;
79 ///
80 /// pub struct AsyncTcpStream {
81 ///     inner: AsyncFd<TcpStream>,
82 /// }
83 ///
84 /// impl AsyncTcpStream {
85 ///     pub fn new(tcp: TcpStream) -> io::Result<Self> {
86 ///         tcp.set_nonblocking(true)?;
87 ///         Ok(Self {
88 ///             inner: AsyncFd::new(tcp)?,
89 ///         })
90 ///     }
91 ///
92 ///     pub async fn read(&self, out: &mut [u8]) -> io::Result<usize> {
93 ///         loop {
94 ///             let mut guard = self.inner.readable().await?;
95 ///
96 ///             match guard.try_io(|inner| inner.get_ref().read(out)) {
97 ///                 Ok(result) => return result,
98 ///                 Err(_would_block) => continue,
99 ///             }
100 ///         }
101 ///     }
102 ///
103 ///     pub async fn write(&self, buf: &[u8]) -> io::Result<usize> {
104 ///         loop {
105 ///             let mut guard = self.inner.writable().await?;
106 ///
107 ///             match guard.try_io(|inner| inner.get_ref().write(buf)) {
108 ///                 Ok(result) => return result,
109 ///                 Err(_would_block) => continue,
110 ///             }
111 ///         }
112 ///     }
113 /// }
114 ///
115 /// impl AsyncRead for AsyncTcpStream {
116 ///     fn poll_read(
117 ///         self: Pin<&mut Self>,
118 ///         cx: &mut Context<'_>,
119 ///         buf: &mut ReadBuf<'_>
120 ///     ) -> Poll<io::Result<()>> {
121 ///         loop {
122 ///             let mut guard = ready!(self.inner.poll_read_ready(cx))?;
123 ///
124 ///             let unfilled = buf.initialize_unfilled();
125 ///             match guard.try_io(|inner| inner.get_ref().read(unfilled)) {
126 ///                 Ok(Ok(len)) => {
127 ///                     buf.advance(len);
128 ///                     return Poll::Ready(Ok(()));
129 ///                 },
130 ///                 Ok(Err(err)) => return Poll::Ready(Err(err)),
131 ///                 Err(_would_block) => continue,
132 ///             }
133 ///         }
134 ///     }
135 /// }
136 ///
137 /// impl AsyncWrite for AsyncTcpStream {
138 ///     fn poll_write(
139 ///         self: Pin<&mut Self>,
140 ///         cx: &mut Context<'_>,
141 ///         buf: &[u8]
142 ///     ) -> Poll<io::Result<usize>> {
143 ///         loop {
144 ///             let mut guard = ready!(self.inner.poll_write_ready(cx))?;
145 ///
146 ///             match guard.try_io(|inner| inner.get_ref().write(buf)) {
147 ///                 Ok(result) => return Poll::Ready(result),
148 ///                 Err(_would_block) => continue,
149 ///             }
150 ///         }
151 ///     }
152 ///
153 ///     fn poll_flush(
154 ///         self: Pin<&mut Self>,
155 ///         cx: &mut Context<'_>,
156 ///     ) -> Poll<io::Result<()>> {
157 ///         // tcp flush is a no-op
158 ///         Poll::Ready(Ok(()))
159 ///     }
160 ///
161 ///     fn poll_shutdown(
162 ///         self: Pin<&mut Self>,
163 ///         cx: &mut Context<'_>,
164 ///     ) -> Poll<io::Result<()>> {
165 ///         self.inner.get_ref().shutdown(std::net::Shutdown::Write)?;
166 ///         Poll::Ready(Ok(()))
167 ///     }
168 /// }
169 /// ```
170 ///
171 /// [`readable`]: method@Self::readable
172 /// [`writable`]: method@Self::writable
173 /// [`AsyncFdReadyGuard`]: struct@self::AsyncFdReadyGuard
174 /// [`TcpStream::poll_read_ready`]: struct@crate::net::TcpStream
175 /// [`AsyncRead`]: trait@crate::io::AsyncRead
176 /// [`AsyncWrite`]: trait@crate::io::AsyncWrite
177 pub struct AsyncFd<T: AsRawFd> {
178     registration: Registration,
179     // The inner value is always present. the Option is required for `drop` and `into_inner`.
180     // In all other methods `unwrap` is valid, and will never panic.
181     inner: Option<T>,
182 }
183 
184 /// Represents an IO-ready event detected on a particular file descriptor that
185 /// has not yet been acknowledged. This is a `must_use` structure to help ensure
186 /// that you do not forget to explicitly clear (or not clear) the event.
187 ///
188 /// This type exposes an immutable reference to the underlying IO object.
189 #[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
190 pub struct AsyncFdReadyGuard<'a, T: AsRawFd> {
191     async_fd: &'a AsyncFd<T>,
192     event: Option<ReadyEvent>,
193 }
194 
195 /// Represents an IO-ready event detected on a particular file descriptor that
196 /// has not yet been acknowledged. This is a `must_use` structure to help ensure
197 /// that you do not forget to explicitly clear (or not clear) the event.
198 ///
199 /// This type exposes a mutable reference to the underlying IO object.
200 #[must_use = "You must explicitly choose whether to clear the readiness state by calling a method on ReadyGuard"]
201 pub struct AsyncFdReadyMutGuard<'a, T: AsRawFd> {
202     async_fd: &'a mut AsyncFd<T>,
203     event: Option<ReadyEvent>,
204 }
205 
206 impl<T: AsRawFd> AsyncFd<T> {
207     /// Creates an AsyncFd backed by (and taking ownership of) an object
208     /// implementing [`AsRawFd`]. The backing file descriptor is cached at the
209     /// time of creation.
210     ///
211     /// Only configures the [`Interest::READABLE`] and [`Interest::WRITABLE`] interests. For more
212     /// control, use [`AsyncFd::with_interest`].
213     ///
214     /// This method must be called in the context of a tokio runtime.
215     ///
216     /// # Panics
217     ///
218     /// This function panics if there is no current reactor set, or if the `rt`
219     /// feature flag is not enabled.
220     #[inline]
221     #[track_caller]
new(inner: T) -> io::Result<Self> where T: AsRawFd,222     pub fn new(inner: T) -> io::Result<Self>
223     where
224         T: AsRawFd,
225     {
226         Self::with_interest(inner, Interest::READABLE | Interest::WRITABLE)
227     }
228 
229     /// Creates an AsyncFd backed by (and taking ownership of) an object
230     /// implementing [`AsRawFd`], with a specific [`Interest`]. The backing
231     /// file descriptor is cached at the time of creation.
232     ///
233     /// # Panics
234     ///
235     /// This function panics if there is no current reactor set, or if the `rt`
236     /// feature flag is not enabled.
237     #[inline]
238     #[track_caller]
with_interest(inner: T, interest: Interest) -> io::Result<Self> where T: AsRawFd,239     pub fn with_interest(inner: T, interest: Interest) -> io::Result<Self>
240     where
241         T: AsRawFd,
242     {
243         Self::new_with_handle_and_interest(inner, scheduler::Handle::current(), interest)
244     }
245 
246     #[track_caller]
new_with_handle_and_interest( inner: T, handle: scheduler::Handle, interest: Interest, ) -> io::Result<Self>247     pub(crate) fn new_with_handle_and_interest(
248         inner: T,
249         handle: scheduler::Handle,
250         interest: Interest,
251     ) -> io::Result<Self> {
252         let fd = inner.as_raw_fd();
253 
254         let registration =
255             Registration::new_with_interest_and_handle(&mut SourceFd(&fd), interest, handle)?;
256 
257         Ok(AsyncFd {
258             registration,
259             inner: Some(inner),
260         })
261     }
262 
263     /// Returns a shared reference to the backing object of this [`AsyncFd`].
264     #[inline]
get_ref(&self) -> &T265     pub fn get_ref(&self) -> &T {
266         self.inner.as_ref().unwrap()
267     }
268 
269     /// Returns a mutable reference to the backing object of this [`AsyncFd`].
270     #[inline]
get_mut(&mut self) -> &mut T271     pub fn get_mut(&mut self) -> &mut T {
272         self.inner.as_mut().unwrap()
273     }
274 
take_inner(&mut self) -> Option<T>275     fn take_inner(&mut self) -> Option<T> {
276         let inner = self.inner.take()?;
277         let fd = inner.as_raw_fd();
278 
279         let _ = self.registration.deregister(&mut SourceFd(&fd));
280 
281         Some(inner)
282     }
283 
284     /// Deregisters this file descriptor and returns ownership of the backing
285     /// object.
into_inner(mut self) -> T286     pub fn into_inner(mut self) -> T {
287         self.take_inner().unwrap()
288     }
289 
290     /// Polls for read readiness.
291     ///
292     /// If the file descriptor is not currently ready for reading, this method
293     /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
294     /// file descriptor becomes ready for reading, [`Waker::wake`] will be called.
295     ///
296     /// Note that on multiple calls to [`poll_read_ready`] or
297     /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the
298     /// most recent call is scheduled to receive a wakeup. (However,
299     /// [`poll_write_ready`] retains a second, independent waker).
300     ///
301     /// This method is intended for cases where creating and pinning a future
302     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
303     /// preferred, as this supports polling from multiple tasks at once.
304     ///
305     /// This method takes `&self`, so it is possible to call this method
306     /// concurrently with other methods on this struct. This method only
307     /// provides shared access to the inner IO resource when handling the
308     /// [`AsyncFdReadyGuard`].
309     ///
310     /// [`poll_read_ready`]: method@Self::poll_read_ready
311     /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut
312     /// [`poll_write_ready`]: method@Self::poll_write_ready
313     /// [`readable`]: method@Self::readable
314     /// [`Context`]: struct@std::task::Context
315     /// [`Waker`]: struct@std::task::Waker
316     /// [`Waker::wake`]: method@std::task::Waker::wake
poll_read_ready<'a>( &'a self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>>317     pub fn poll_read_ready<'a>(
318         &'a self,
319         cx: &mut Context<'_>,
320     ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
321         let event = ready!(self.registration.poll_read_ready(cx))?;
322 
323         Poll::Ready(Ok(AsyncFdReadyGuard {
324             async_fd: self,
325             event: Some(event),
326         }))
327     }
328 
329     /// Polls for read readiness.
330     ///
331     /// If the file descriptor is not currently ready for reading, this method
332     /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
333     /// file descriptor becomes ready for reading, [`Waker::wake`] will be called.
334     ///
335     /// Note that on multiple calls to [`poll_read_ready`] or
336     /// [`poll_read_ready_mut`], only the `Waker` from the `Context` passed to the
337     /// most recent call is scheduled to receive a wakeup. (However,
338     /// [`poll_write_ready`] retains a second, independent waker).
339     ///
340     /// This method is intended for cases where creating and pinning a future
341     /// via [`readable`] is not feasible. Where possible, using [`readable`] is
342     /// preferred, as this supports polling from multiple tasks at once.
343     ///
344     /// This method takes `&mut self`, so it is possible to access the inner IO
345     /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
346     ///
347     /// [`poll_read_ready`]: method@Self::poll_read_ready
348     /// [`poll_read_ready_mut`]: method@Self::poll_read_ready_mut
349     /// [`poll_write_ready`]: method@Self::poll_write_ready
350     /// [`readable`]: method@Self::readable
351     /// [`Context`]: struct@std::task::Context
352     /// [`Waker`]: struct@std::task::Waker
353     /// [`Waker::wake`]: method@std::task::Waker::wake
poll_read_ready_mut<'a>( &'a mut self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>>354     pub fn poll_read_ready_mut<'a>(
355         &'a mut self,
356         cx: &mut Context<'_>,
357     ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> {
358         let event = ready!(self.registration.poll_read_ready(cx))?;
359 
360         Poll::Ready(Ok(AsyncFdReadyMutGuard {
361             async_fd: self,
362             event: Some(event),
363         }))
364     }
365 
366     /// Polls for write readiness.
367     ///
368     /// If the file descriptor is not currently ready for writing, this method
369     /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
370     /// file descriptor becomes ready for writing, [`Waker::wake`] will be called.
371     ///
372     /// Note that on multiple calls to [`poll_write_ready`] or
373     /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the
374     /// most recent call is scheduled to receive a wakeup. (However,
375     /// [`poll_read_ready`] retains a second, independent waker).
376     ///
377     /// This method is intended for cases where creating and pinning a future
378     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
379     /// preferred, as this supports polling from multiple tasks at once.
380     ///
381     /// This method takes `&self`, so it is possible to call this method
382     /// concurrently with other methods on this struct. This method only
383     /// provides shared access to the inner IO resource when handling the
384     /// [`AsyncFdReadyGuard`].
385     ///
386     /// [`poll_read_ready`]: method@Self::poll_read_ready
387     /// [`poll_write_ready`]: method@Self::poll_write_ready
388     /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut
389     /// [`writable`]: method@Self::readable
390     /// [`Context`]: struct@std::task::Context
391     /// [`Waker`]: struct@std::task::Waker
392     /// [`Waker::wake`]: method@std::task::Waker::wake
poll_write_ready<'a>( &'a self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>>393     pub fn poll_write_ready<'a>(
394         &'a self,
395         cx: &mut Context<'_>,
396     ) -> Poll<io::Result<AsyncFdReadyGuard<'a, T>>> {
397         let event = ready!(self.registration.poll_write_ready(cx))?;
398 
399         Poll::Ready(Ok(AsyncFdReadyGuard {
400             async_fd: self,
401             event: Some(event),
402         }))
403     }
404 
405     /// Polls for write readiness.
406     ///
407     /// If the file descriptor is not currently ready for writing, this method
408     /// will store a clone of the [`Waker`] from the provided [`Context`]. When the
409     /// file descriptor becomes ready for writing, [`Waker::wake`] will be called.
410     ///
411     /// Note that on multiple calls to [`poll_write_ready`] or
412     /// [`poll_write_ready_mut`], only the `Waker` from the `Context` passed to the
413     /// most recent call is scheduled to receive a wakeup. (However,
414     /// [`poll_read_ready`] retains a second, independent waker).
415     ///
416     /// This method is intended for cases where creating and pinning a future
417     /// via [`writable`] is not feasible. Where possible, using [`writable`] is
418     /// preferred, as this supports polling from multiple tasks at once.
419     ///
420     /// This method takes `&mut self`, so it is possible to access the inner IO
421     /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
422     ///
423     /// [`poll_read_ready`]: method@Self::poll_read_ready
424     /// [`poll_write_ready`]: method@Self::poll_write_ready
425     /// [`poll_write_ready_mut`]: method@Self::poll_write_ready_mut
426     /// [`writable`]: method@Self::readable
427     /// [`Context`]: struct@std::task::Context
428     /// [`Waker`]: struct@std::task::Waker
429     /// [`Waker::wake`]: method@std::task::Waker::wake
poll_write_ready_mut<'a>( &'a mut self, cx: &mut Context<'_>, ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>>430     pub fn poll_write_ready_mut<'a>(
431         &'a mut self,
432         cx: &mut Context<'_>,
433     ) -> Poll<io::Result<AsyncFdReadyMutGuard<'a, T>>> {
434         let event = ready!(self.registration.poll_write_ready(cx))?;
435 
436         Poll::Ready(Ok(AsyncFdReadyMutGuard {
437             async_fd: self,
438             event: Some(event),
439         }))
440     }
441 
442     /// Waits for any of the requested ready states, returning a
443     /// [`AsyncFdReadyGuard`] that must be dropped to resume
444     /// polling for the requested ready states.
445     ///
446     /// The function may complete without the file descriptor being ready. This is a
447     /// false-positive and attempting an operation will return with
448     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
449     /// [`Ready`] set, so you should always check the returned value and possibly
450     /// wait again if the requested states are not set.
451     ///
452     /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared.
453     /// When a combined interest is used, it is important to clear only the readiness
454     /// that is actually observed to block. For instance when the combined
455     /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only
456     /// read readiness should be cleared using the [`AsyncFdReadyGuard::clear_ready_matching`] method:
457     /// `guard.clear_ready_matching(Ready::READABLE)`.
458     /// Also clearing the write readiness in this case would be incorrect. The [`AsyncFdReadyGuard::clear_ready`]
459     /// method clears all readiness flags.
460     ///
461     /// This method takes `&self`, so it is possible to call this method
462     /// concurrently with other methods on this struct. This method only
463     /// provides shared access to the inner IO resource when handling the
464     /// [`AsyncFdReadyGuard`].
465     ///
466     /// # Examples
467     ///
468     /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
469     /// splitting.
470     ///
471     /// ```no_run
472     /// use std::error::Error;
473     /// use std::io;
474     /// use std::io::{Read, Write};
475     /// use std::net::TcpStream;
476     /// use tokio::io::unix::AsyncFd;
477     /// use tokio::io::{Interest, Ready};
478     ///
479     /// #[tokio::main]
480     /// async fn main() -> Result<(), Box<dyn Error>> {
481     ///     let stream = TcpStream::connect("127.0.0.1:8080")?;
482     ///     stream.set_nonblocking(true)?;
483     ///     let stream = AsyncFd::new(stream)?;
484     ///
485     ///     loop {
486     ///         let mut guard = stream
487     ///             .ready(Interest::READABLE | Interest::WRITABLE)
488     ///             .await?;
489     ///
490     ///         if guard.ready().is_readable() {
491     ///             let mut data = vec![0; 1024];
492     ///             // Try to read data, this may still fail with `WouldBlock`
493     ///             // if the readiness event is a false positive.
494     ///             match stream.get_ref().read(&mut data) {
495     ///                 Ok(n) => {
496     ///                     println!("read {} bytes", n);
497     ///                 }
498     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
499     ///                     // a read has blocked, but a write might still succeed.
500     ///                     // clear only the read readiness.
501     ///                     guard.clear_ready_matching(Ready::READABLE);
502     ///                     continue;
503     ///                 }
504     ///                 Err(e) => {
505     ///                     return Err(e.into());
506     ///                 }
507     ///             }
508     ///         }
509     ///
510     ///         if guard.ready().is_writable() {
511     ///             // Try to write data, this may still fail with `WouldBlock`
512     ///             // if the readiness event is a false positive.
513     ///             match stream.get_ref().write(b"hello world") {
514     ///                 Ok(n) => {
515     ///                     println!("write {} bytes", n);
516     ///                 }
517     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
518     ///                     // a write has blocked, but a read might still succeed.
519     ///                     // clear only the write readiness.
520     ///                     guard.clear_ready_matching(Ready::WRITABLE);
521     ///                     continue;
522     ///                 }
523     ///                 Err(e) => {
524     ///                     return Err(e.into());
525     ///                 }
526     ///             }
527     ///         }
528     ///     }
529     /// }
530     /// ```
ready(&self, interest: Interest) -> io::Result<AsyncFdReadyGuard<'_, T>>531     pub async fn ready(&self, interest: Interest) -> io::Result<AsyncFdReadyGuard<'_, T>> {
532         let event = self.registration.readiness(interest).await?;
533 
534         Ok(AsyncFdReadyGuard {
535             async_fd: self,
536             event: Some(event),
537         })
538     }
539 
540     /// Waits for any of the requested ready states, returning a
541     /// [`AsyncFdReadyMutGuard`] that must be dropped to resume
542     /// polling for the requested ready states.
543     ///
544     /// The function may complete without the file descriptor being ready. This is a
545     /// false-positive and attempting an operation will return with
546     /// `io::ErrorKind::WouldBlock`. The function can also return with an empty
547     /// [`Ready`] set, so you should always check the returned value and possibly
548     /// wait again if the requested states are not set.
549     ///
550     /// When an IO operation does return `io::ErrorKind::WouldBlock`, the readiness must be cleared.
551     /// When a combined interest is used, it is important to clear only the readiness
552     /// that is actually observed to block. For instance when the combined
553     /// interest `Interest::READABLE | Interest::WRITABLE` is used, and a read blocks, only
554     /// read readiness should be cleared using the [`AsyncFdReadyMutGuard::clear_ready_matching`] method:
555     /// `guard.clear_ready_matching(Ready::READABLE)`.
556     /// Also clearing the write readiness in this case would be incorrect.
557     /// The [`AsyncFdReadyMutGuard::clear_ready`] method clears all readiness flags.
558     ///
559     /// This method takes `&mut self`, so it is possible to access the inner IO
560     /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
561     ///
562     /// # Examples
563     ///
564     /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
565     /// splitting.
566     ///
567     /// ```no_run
568     /// use std::error::Error;
569     /// use std::io;
570     /// use std::io::{Read, Write};
571     /// use std::net::TcpStream;
572     /// use tokio::io::unix::AsyncFd;
573     /// use tokio::io::{Interest, Ready};
574     ///
575     /// #[tokio::main]
576     /// async fn main() -> Result<(), Box<dyn Error>> {
577     ///     let stream = TcpStream::connect("127.0.0.1:8080")?;
578     ///     stream.set_nonblocking(true)?;
579     ///     let mut stream = AsyncFd::new(stream)?;
580     ///
581     ///     loop {
582     ///         let mut guard = stream
583     ///             .ready_mut(Interest::READABLE | Interest::WRITABLE)
584     ///             .await?;
585     ///
586     ///         if guard.ready().is_readable() {
587     ///             let mut data = vec![0; 1024];
588     ///             // Try to read data, this may still fail with `WouldBlock`
589     ///             // if the readiness event is a false positive.
590     ///             match guard.get_inner_mut().read(&mut data) {
591     ///                 Ok(n) => {
592     ///                     println!("read {} bytes", n);
593     ///                 }
594     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
595     ///                     // a read has blocked, but a write might still succeed.
596     ///                     // clear only the read readiness.
597     ///                     guard.clear_ready_matching(Ready::READABLE);
598     ///                     continue;
599     ///                 }
600     ///                 Err(e) => {
601     ///                     return Err(e.into());
602     ///                 }
603     ///             }
604     ///         }
605     ///
606     ///         if guard.ready().is_writable() {
607     ///             // Try to write data, this may still fail with `WouldBlock`
608     ///             // if the readiness event is a false positive.
609     ///             match guard.get_inner_mut().write(b"hello world") {
610     ///                 Ok(n) => {
611     ///                     println!("write {} bytes", n);
612     ///                 }
613     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
614     ///                     // a write has blocked, but a read might still succeed.
615     ///                     // clear only the write readiness.
616     ///                     guard.clear_ready_matching(Ready::WRITABLE);
617     ///                     continue;
618     ///                 }
619     ///                 Err(e) => {
620     ///                     return Err(e.into());
621     ///                 }
622     ///             }
623     ///         }
624     ///     }
625     /// }
626     /// ```
ready_mut( &mut self, interest: Interest, ) -> io::Result<AsyncFdReadyMutGuard<'_, T>>627     pub async fn ready_mut(
628         &mut self,
629         interest: Interest,
630     ) -> io::Result<AsyncFdReadyMutGuard<'_, T>> {
631         let event = self.registration.readiness(interest).await?;
632 
633         Ok(AsyncFdReadyMutGuard {
634             async_fd: self,
635             event: Some(event),
636         })
637     }
638 
639     /// Waits for the file descriptor to become readable, returning a
640     /// [`AsyncFdReadyGuard`] that must be dropped to resume read-readiness
641     /// polling.
642     ///
643     /// This method takes `&self`, so it is possible to call this method
644     /// concurrently with other methods on this struct. This method only
645     /// provides shared access to the inner IO resource when handling the
646     /// [`AsyncFdReadyGuard`].
647     #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
readable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>>648     pub async fn readable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
649         self.ready(Interest::READABLE).await
650     }
651 
652     /// Waits for the file descriptor to become readable, returning a
653     /// [`AsyncFdReadyMutGuard`] that must be dropped to resume read-readiness
654     /// polling.
655     ///
656     /// This method takes `&mut self`, so it is possible to access the inner IO
657     /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
658     #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
readable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>>659     pub async fn readable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
660         self.ready_mut(Interest::READABLE).await
661     }
662 
663     /// Waits for the file descriptor to become writable, returning a
664     /// [`AsyncFdReadyGuard`] that must be dropped to resume write-readiness
665     /// polling.
666     ///
667     /// This method takes `&self`, so it is possible to call this method
668     /// concurrently with other methods on this struct. This method only
669     /// provides shared access to the inner IO resource when handling the
670     /// [`AsyncFdReadyGuard`].
671     #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
writable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>>672     pub async fn writable<'a>(&'a self) -> io::Result<AsyncFdReadyGuard<'a, T>> {
673         self.ready(Interest::WRITABLE).await
674     }
675 
676     /// Waits for the file descriptor to become writable, returning a
677     /// [`AsyncFdReadyMutGuard`] that must be dropped to resume write-readiness
678     /// polling.
679     ///
680     /// This method takes `&mut self`, so it is possible to access the inner IO
681     /// resource mutably when handling the [`AsyncFdReadyMutGuard`].
682     #[allow(clippy::needless_lifetimes)] // The lifetime improves rustdoc rendering.
writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>>683     pub async fn writable_mut<'a>(&'a mut self) -> io::Result<AsyncFdReadyMutGuard<'a, T>> {
684         self.ready_mut(Interest::WRITABLE).await
685     }
686 
687     /// Reads or writes from the file descriptor using a user-provided IO operation.
688     ///
689     /// The `async_io` method is a convenience utility that waits for the file
690     /// descriptor to become ready, and then executes the provided IO operation.
691     /// Since file descriptors may be marked ready spuriously, the closure will
692     /// be called repeatedly until it returns something other than a
693     /// [`WouldBlock`] error. This is done using the following loop:
694     ///
695     /// ```no_run
696     /// # use std::io::{self, Result};
697     /// # struct Dox<T> { inner: T }
698     /// # impl<T> Dox<T> {
699     /// #     async fn writable(&self) -> Result<&Self> {
700     /// #         Ok(self)
701     /// #     }
702     /// #     fn try_io<R>(&self, _: impl FnMut(&T) -> Result<R>) -> Result<Result<R>> {
703     /// #         panic!()
704     /// #     }
705     /// async fn async_io<R>(&self, mut f: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
706     ///     loop {
707     ///         // or `readable` if called with the read interest.
708     ///         let guard = self.writable().await?;
709     ///
710     ///         match guard.try_io(&mut f) {
711     ///             Ok(result) => return result,
712     ///             Err(_would_block) => continue,
713     ///         }
714     ///     }
715     /// }
716     /// # }
717     /// ```
718     ///
719     /// The closure should only return a [`WouldBlock`] error if it has performed
720     /// an IO operation on the file descriptor that failed due to the file descriptor not being
721     /// ready. Returning a [`WouldBlock`] error in any other situation will
722     /// incorrectly clear the readiness flag, which can cause the file descriptor to
723     /// behave incorrectly.
724     ///
725     /// The closure should not perform the IO operation using any of the methods
726     /// defined on the Tokio [`AsyncFd`] type, as this will mess with the
727     /// readiness flag and can cause the file descriptor to behave incorrectly.
728     ///
729     /// This method is not intended to be used with combined interests.
730     /// The closure should perform only one type of IO operation, so it should not
731     /// require more than one ready state. This method may panic or sleep forever
732     /// if it is called with a combined interest.
733     ///
734     /// # Examples
735     ///
736     /// This example sends some bytes on the inner [`std::net::UdpSocket`]. The `async_io`
737     /// method waits for readiness, and retries if the send operation does block. This example
738     /// is equivalent to the one given for [`try_io`].
739     ///
740     /// ```no_run
741     /// use tokio::io::{Interest, unix::AsyncFd};
742     ///
743     /// use std::io;
744     /// use std::net::UdpSocket;
745     ///
746     /// #[tokio::main]
747     /// async fn main() -> io::Result<()> {
748     ///     let socket = UdpSocket::bind("0.0.0.0:8080")?;
749     ///     socket.set_nonblocking(true)?;
750     ///     let async_fd = AsyncFd::new(socket)?;
751     ///
752     ///     let written = async_fd
753     ///         .async_io(Interest::WRITABLE, |inner| inner.send(&[1, 2]))
754     ///         .await?;
755     ///
756     ///     println!("wrote {written} bytes");
757     ///
758     ///     Ok(())
759     /// }
760     /// ```
761     ///
762     /// [`try_io`]: AsyncFdReadyGuard::try_io
763     /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
async_io<R>( &self, interest: Interest, mut f: impl FnMut(&T) -> io::Result<R>, ) -> io::Result<R>764     pub async fn async_io<R>(
765         &self,
766         interest: Interest,
767         mut f: impl FnMut(&T) -> io::Result<R>,
768     ) -> io::Result<R> {
769         self.registration
770             .async_io(interest, || f(self.get_ref()))
771             .await
772     }
773 
774     /// Reads or writes from the file descriptor using a user-provided IO operation.
775     ///
776     /// The behavior is the same as [`async_io`], except that the closure can mutate the inner
777     /// value of the [`AsyncFd`].
778     ///
779     /// [`async_io`]: AsyncFd::async_io
async_io_mut<R>( &mut self, interest: Interest, mut f: impl FnMut(&mut T) -> io::Result<R>, ) -> io::Result<R>780     pub async fn async_io_mut<R>(
781         &mut self,
782         interest: Interest,
783         mut f: impl FnMut(&mut T) -> io::Result<R>,
784     ) -> io::Result<R> {
785         self.registration
786             .async_io(interest, || f(self.inner.as_mut().unwrap()))
787             .await
788     }
789 }
790 
791 impl<T: AsRawFd> AsRawFd for AsyncFd<T> {
as_raw_fd(&self) -> RawFd792     fn as_raw_fd(&self) -> RawFd {
793         self.inner.as_ref().unwrap().as_raw_fd()
794     }
795 }
796 
797 impl<T: AsRawFd> std::os::unix::io::AsFd for AsyncFd<T> {
as_fd(&self) -> std::os::unix::io::BorrowedFd<'_>798     fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
799         unsafe { std::os::unix::io::BorrowedFd::borrow_raw(self.as_raw_fd()) }
800     }
801 }
802 
803 impl<T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFd<T> {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result804     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
805         f.debug_struct("AsyncFd")
806             .field("inner", &self.inner)
807             .finish()
808     }
809 }
810 
811 impl<T: AsRawFd> Drop for AsyncFd<T> {
drop(&mut self)812     fn drop(&mut self) {
813         let _ = self.take_inner();
814     }
815 }
816 
817 impl<'a, Inner: AsRawFd> AsyncFdReadyGuard<'a, Inner> {
818     /// Indicates to tokio that the file descriptor is no longer ready. All
819     /// internal readiness flags will be cleared, and tokio will wait for the
820     /// next edge-triggered readiness notification from the OS.
821     ///
822     /// This function is commonly used with guards returned by [`AsyncFd::readable`] and
823     /// [`AsyncFd::writable`].
824     ///
825     /// It is critical that this function not be called unless your code
826     /// _actually observes_ that the file descriptor is _not_ ready. Do not call
827     /// it simply because, for example, a read succeeded; it should be called
828     /// when a read is observed to block.
clear_ready(&mut self)829     pub fn clear_ready(&mut self) {
830         if let Some(event) = self.event.take() {
831             self.async_fd.registration.clear_readiness(event);
832         }
833     }
834 
835     /// Indicates to tokio that the file descriptor no longer has a specific readiness.
836     /// The internal readiness flag will be cleared, and tokio will wait for the
837     /// next edge-triggered readiness notification from the OS.
838     ///
839     /// This function is useful in combination with the [`AsyncFd::ready`] method when a
840     /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used.
841     ///
842     /// It is critical that this function not be called unless your code
843     /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`.
844     /// Do not call it simply because, for example, a read succeeded; it should be called
845     /// when a read is observed to block. Only clear the specific readiness that is observed to
846     /// block. For example when a read blocks when using a combined interest,
847     /// only clear `Ready::READABLE`.
848     ///
849     /// # Examples
850     ///
851     /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
852     /// splitting.
853     ///
854     /// ```no_run
855     /// use std::error::Error;
856     /// use std::io;
857     /// use std::io::{Read, Write};
858     /// use std::net::TcpStream;
859     /// use tokio::io::unix::AsyncFd;
860     /// use tokio::io::{Interest, Ready};
861     ///
862     /// #[tokio::main]
863     /// async fn main() -> Result<(), Box<dyn Error>> {
864     ///     let stream = TcpStream::connect("127.0.0.1:8080")?;
865     ///     stream.set_nonblocking(true)?;
866     ///     let stream = AsyncFd::new(stream)?;
867     ///
868     ///     loop {
869     ///         let mut guard = stream
870     ///             .ready(Interest::READABLE | Interest::WRITABLE)
871     ///             .await?;
872     ///
873     ///         if guard.ready().is_readable() {
874     ///             let mut data = vec![0; 1024];
875     ///             // Try to read data, this may still fail with `WouldBlock`
876     ///             // if the readiness event is a false positive.
877     ///             match stream.get_ref().read(&mut data) {
878     ///                 Ok(n) => {
879     ///                     println!("read {} bytes", n);
880     ///                 }
881     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
882     ///                     // a read has blocked, but a write might still succeed.
883     ///                     // clear only the read readiness.
884     ///                     guard.clear_ready_matching(Ready::READABLE);
885     ///                     continue;
886     ///                 }
887     ///                 Err(e) => {
888     ///                     return Err(e.into());
889     ///                 }
890     ///             }
891     ///         }
892     ///
893     ///         if guard.ready().is_writable() {
894     ///             // Try to write data, this may still fail with `WouldBlock`
895     ///             // if the readiness event is a false positive.
896     ///             match stream.get_ref().write(b"hello world") {
897     ///                 Ok(n) => {
898     ///                     println!("write {} bytes", n);
899     ///                 }
900     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
901     ///                     // a write has blocked, but a read might still succeed.
902     ///                     // clear only the write readiness.
903     ///                     guard.clear_ready_matching(Ready::WRITABLE);
904     ///                     continue;
905     ///                 }
906     ///                 Err(e) => {
907     ///                     return Err(e.into());
908     ///                 }
909     ///             }
910     ///         }
911     ///     }
912     /// }
913     /// ```
clear_ready_matching(&mut self, ready: Ready)914     pub fn clear_ready_matching(&mut self, ready: Ready) {
915         if let Some(mut event) = self.event.take() {
916             self.async_fd
917                 .registration
918                 .clear_readiness(event.with_ready(ready));
919 
920             // the event is no longer ready for the readiness that was just cleared
921             event.ready = event.ready - ready;
922 
923             if !event.ready.is_empty() {
924                 self.event = Some(event);
925             }
926         }
927     }
928 
929     /// This method should be invoked when you intentionally want to keep the
930     /// ready flag asserted.
931     ///
932     /// While this function is itself a no-op, it satisfies the `#[must_use]`
933     /// constraint on the [`AsyncFdReadyGuard`] type.
retain_ready(&mut self)934     pub fn retain_ready(&mut self) {
935         // no-op
936     }
937 
938     /// Get the [`Ready`] value associated with this guard.
939     ///
940     /// This method will return the empty readiness state if
941     /// [`AsyncFdReadyGuard::clear_ready`] has been called on
942     /// the guard.
943     ///
944     /// [`Ready`]: crate::io::Ready
ready(&self) -> Ready945     pub fn ready(&self) -> Ready {
946         match &self.event {
947             Some(event) => event.ready,
948             None => Ready::EMPTY,
949         }
950     }
951 
952     /// Performs the provided IO operation.
953     ///
954     /// If `f` returns a [`WouldBlock`] error, the readiness state associated
955     /// with this file descriptor is cleared, and the method returns
956     /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the
957     /// `AsyncFd` again when this happens.
958     ///
959     /// This method helps ensure that the readiness state of the underlying file
960     /// descriptor remains in sync with the tokio-side readiness state, by
961     /// clearing the tokio-side state only when a [`WouldBlock`] condition
962     /// occurs. It is the responsibility of the caller to ensure that `f`
963     /// returns [`WouldBlock`] only if the file descriptor that originated this
964     /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
965     /// create this `AsyncFdReadyGuard`.
966     ///
967     /// # Examples
968     ///
969     /// This example sends some bytes to the inner [`std::net::UdpSocket`]. Waiting
970     /// for write-readiness and retrying when the send operation does block are explicit.
971     /// This example can be written more succinctly using [`AsyncFd::async_io`].
972     ///
973     /// ```no_run
974     /// use tokio::io::unix::AsyncFd;
975     ///
976     /// use std::io;
977     /// use std::net::UdpSocket;
978     ///
979     /// #[tokio::main]
980     /// async fn main() -> io::Result<()> {
981     ///     let socket = UdpSocket::bind("0.0.0.0:8080")?;
982     ///     socket.set_nonblocking(true)?;
983     ///     let async_fd = AsyncFd::new(socket)?;
984     ///
985     ///     let written = loop {
986     ///         let mut guard = async_fd.writable().await?;
987     ///         match guard.try_io(|inner| inner.get_ref().send(&[1, 2])) {
988     ///             Ok(result) => {
989     ///                 break result?;
990     ///             }
991     ///             Err(_would_block) => {
992     ///                 // try_io already cleared the file descriptor's readiness state
993     ///                 continue;
994     ///             }
995     ///         }
996     ///     };
997     ///
998     ///     println!("wrote {written} bytes");
999     ///
1000     ///     Ok(())
1001     /// }
1002     /// ```
1003     ///
1004     /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1005     // Alias for old name in 0.x
1006     #[cfg_attr(docsrs, doc(alias = "with_io"))]
try_io<R>( &mut self, f: impl FnOnce(&'a AsyncFd<Inner>) -> io::Result<R>, ) -> Result<io::Result<R>, TryIoError>1007     pub fn try_io<R>(
1008         &mut self,
1009         f: impl FnOnce(&'a AsyncFd<Inner>) -> io::Result<R>,
1010     ) -> Result<io::Result<R>, TryIoError> {
1011         let result = f(self.async_fd);
1012 
1013         match result {
1014             Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1015                 self.clear_ready();
1016                 Err(TryIoError(()))
1017             }
1018             result => Ok(result),
1019         }
1020     }
1021 
1022     /// Returns a shared reference to the inner [`AsyncFd`].
get_ref(&self) -> &'a AsyncFd<Inner>1023     pub fn get_ref(&self) -> &'a AsyncFd<Inner> {
1024         self.async_fd
1025     }
1026 
1027     /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
get_inner(&self) -> &'a Inner1028     pub fn get_inner(&self) -> &'a Inner {
1029         self.get_ref().get_ref()
1030     }
1031 }
1032 
1033 impl<'a, Inner: AsRawFd> AsyncFdReadyMutGuard<'a, Inner> {
1034     /// Indicates to tokio that the file descriptor is no longer ready. All
1035     /// internal readiness flags will be cleared, and tokio will wait for the
1036     /// next edge-triggered readiness notification from the OS.
1037     ///
1038     /// This function is commonly used with guards returned by [`AsyncFd::readable_mut`] and
1039     /// [`AsyncFd::writable_mut`].
1040     ///
1041     /// It is critical that this function not be called unless your code
1042     /// _actually observes_ that the file descriptor is _not_ ready. Do not call
1043     /// it simply because, for example, a read succeeded; it should be called
1044     /// when a read is observed to block.
clear_ready(&mut self)1045     pub fn clear_ready(&mut self) {
1046         if let Some(event) = self.event.take() {
1047             self.async_fd.registration.clear_readiness(event);
1048         }
1049     }
1050 
1051     /// Indicates to tokio that the file descriptor no longer has a specific readiness.
1052     /// The internal readiness flag will be cleared, and tokio will wait for the
1053     /// next edge-triggered readiness notification from the OS.
1054     ///
1055     /// This function is useful in combination with the [`AsyncFd::ready_mut`] method when a
1056     /// combined interest like `Interest::READABLE | Interest::WRITABLE` is used.
1057     ///
1058     /// It is critical that this function not be called unless your code
1059     /// _actually observes_ that the file descriptor is _not_ ready for the provided `Ready`.
1060     /// Do not call it simply because, for example, a read succeeded; it should be called
1061     /// when a read is observed to block. Only clear the specific readiness that is observed to
1062     /// block. For example when a read blocks when using a combined interest,
1063     /// only clear `Ready::READABLE`.
1064     ///
1065     /// # Examples
1066     ///
1067     /// Concurrently read and write to a [`std::net::TcpStream`] on the same task without
1068     /// splitting.
1069     ///
1070     /// ```no_run
1071     /// use std::error::Error;
1072     /// use std::io;
1073     /// use std::io::{Read, Write};
1074     /// use std::net::TcpStream;
1075     /// use tokio::io::unix::AsyncFd;
1076     /// use tokio::io::{Interest, Ready};
1077     ///
1078     /// #[tokio::main]
1079     /// async fn main() -> Result<(), Box<dyn Error>> {
1080     ///     let stream = TcpStream::connect("127.0.0.1:8080")?;
1081     ///     stream.set_nonblocking(true)?;
1082     ///     let mut stream = AsyncFd::new(stream)?;
1083     ///
1084     ///     loop {
1085     ///         let mut guard = stream
1086     ///             .ready_mut(Interest::READABLE | Interest::WRITABLE)
1087     ///             .await?;
1088     ///
1089     ///         if guard.ready().is_readable() {
1090     ///             let mut data = vec![0; 1024];
1091     ///             // Try to read data, this may still fail with `WouldBlock`
1092     ///             // if the readiness event is a false positive.
1093     ///             match guard.get_inner_mut().read(&mut data) {
1094     ///                 Ok(n) => {
1095     ///                     println!("read {} bytes", n);
1096     ///                 }
1097     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1098     ///                     // a read has blocked, but a write might still succeed.
1099     ///                     // clear only the read readiness.
1100     ///                     guard.clear_ready_matching(Ready::READABLE);
1101     ///                     continue;
1102     ///                 }
1103     ///                 Err(e) => {
1104     ///                     return Err(e.into());
1105     ///                 }
1106     ///             }
1107     ///         }
1108     ///
1109     ///         if guard.ready().is_writable() {
1110     ///             // Try to write data, this may still fail with `WouldBlock`
1111     ///             // if the readiness event is a false positive.
1112     ///             match guard.get_inner_mut().write(b"hello world") {
1113     ///                 Ok(n) => {
1114     ///                     println!("write {} bytes", n);
1115     ///                 }
1116     ///                 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
1117     ///                     // a write has blocked, but a read might still succeed.
1118     ///                     // clear only the write readiness.
1119     ///                     guard.clear_ready_matching(Ready::WRITABLE);
1120     ///                     continue;
1121     ///                 }
1122     ///                 Err(e) => {
1123     ///                     return Err(e.into());
1124     ///                 }
1125     ///             }
1126     ///         }
1127     ///     }
1128     /// }
1129     /// ```
clear_ready_matching(&mut self, ready: Ready)1130     pub fn clear_ready_matching(&mut self, ready: Ready) {
1131         if let Some(mut event) = self.event.take() {
1132             self.async_fd
1133                 .registration
1134                 .clear_readiness(event.with_ready(ready));
1135 
1136             // the event is no longer ready for the readiness that was just cleared
1137             event.ready = event.ready - ready;
1138 
1139             if !event.ready.is_empty() {
1140                 self.event = Some(event);
1141             }
1142         }
1143     }
1144 
1145     /// This method should be invoked when you intentionally want to keep the
1146     /// ready flag asserted.
1147     ///
1148     /// While this function is itself a no-op, it satisfies the `#[must_use]`
1149     /// constraint on the [`AsyncFdReadyGuard`] type.
retain_ready(&mut self)1150     pub fn retain_ready(&mut self) {
1151         // no-op
1152     }
1153 
1154     /// Get the [`Ready`] value associated with this guard.
1155     ///
1156     /// This method will return the empty readiness state if
1157     /// [`AsyncFdReadyGuard::clear_ready`] has been called on
1158     /// the guard.
1159     ///
1160     /// [`Ready`]: super::Ready
ready(&self) -> Ready1161     pub fn ready(&self) -> Ready {
1162         match &self.event {
1163             Some(event) => event.ready,
1164             None => Ready::EMPTY,
1165         }
1166     }
1167 
1168     /// Performs the provided IO operation.
1169     ///
1170     /// If `f` returns a [`WouldBlock`] error, the readiness state associated
1171     /// with this file descriptor is cleared, and the method returns
1172     /// `Err(TryIoError::WouldBlock)`. You will typically need to poll the
1173     /// `AsyncFd` again when this happens.
1174     ///
1175     /// This method helps ensure that the readiness state of the underlying file
1176     /// descriptor remains in sync with the tokio-side readiness state, by
1177     /// clearing the tokio-side state only when a [`WouldBlock`] condition
1178     /// occurs. It is the responsibility of the caller to ensure that `f`
1179     /// returns [`WouldBlock`] only if the file descriptor that originated this
1180     /// `AsyncFdReadyGuard` no longer expresses the readiness state that was queried to
1181     /// create this `AsyncFdReadyGuard`.
1182     ///
1183     /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
try_io<R>( &mut self, f: impl FnOnce(&mut AsyncFd<Inner>) -> io::Result<R>, ) -> Result<io::Result<R>, TryIoError>1184     pub fn try_io<R>(
1185         &mut self,
1186         f: impl FnOnce(&mut AsyncFd<Inner>) -> io::Result<R>,
1187     ) -> Result<io::Result<R>, TryIoError> {
1188         let result = f(self.async_fd);
1189 
1190         match result {
1191             Err(err) if err.kind() == io::ErrorKind::WouldBlock => {
1192                 self.clear_ready();
1193                 Err(TryIoError(()))
1194             }
1195             result => Ok(result),
1196         }
1197     }
1198 
1199     /// Returns a shared reference to the inner [`AsyncFd`].
get_ref(&self) -> &AsyncFd<Inner>1200     pub fn get_ref(&self) -> &AsyncFd<Inner> {
1201         self.async_fd
1202     }
1203 
1204     /// Returns a mutable reference to the inner [`AsyncFd`].
get_mut(&mut self) -> &mut AsyncFd<Inner>1205     pub fn get_mut(&mut self) -> &mut AsyncFd<Inner> {
1206         self.async_fd
1207     }
1208 
1209     /// Returns a shared reference to the backing object of the inner [`AsyncFd`].
get_inner(&self) -> &Inner1210     pub fn get_inner(&self) -> &Inner {
1211         self.get_ref().get_ref()
1212     }
1213 
1214     /// Returns a mutable reference to the backing object of the inner [`AsyncFd`].
get_inner_mut(&mut self) -> &mut Inner1215     pub fn get_inner_mut(&mut self) -> &mut Inner {
1216         self.get_mut().get_mut()
1217     }
1218 }
1219 
1220 impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyGuard<'a, T> {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result1221     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1222         f.debug_struct("ReadyGuard")
1223             .field("async_fd", &self.async_fd)
1224             .finish()
1225     }
1226 }
1227 
1228 impl<'a, T: std::fmt::Debug + AsRawFd> std::fmt::Debug for AsyncFdReadyMutGuard<'a, T> {
fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result1229     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1230         f.debug_struct("MutReadyGuard")
1231             .field("async_fd", &self.async_fd)
1232             .finish()
1233     }
1234 }
1235 
1236 /// The error type returned by [`try_io`].
1237 ///
1238 /// This error indicates that the IO resource returned a [`WouldBlock`] error.
1239 ///
1240 /// [`WouldBlock`]: std::io::ErrorKind::WouldBlock
1241 /// [`try_io`]: method@AsyncFdReadyGuard::try_io
1242 #[derive(Debug)]
1243 pub struct TryIoError(());
1244