• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::{
6     alloc::Layout,
7     any::Any,
8     cell::RefCell,
9     cmp::min,
10     convert::{TryFrom, TryInto},
11     ffi::CStr,
12     future::Future,
13     io,
14     mem::{replace, size_of, MaybeUninit},
15     os::unix::io::{AsRawFd, RawFd},
16     pin::Pin,
17     ptr,
18     rc::Rc,
19     sync::Arc,
20     task::{self, Poll},
21     time::Duration,
22 };
23 
24 use anyhow::{anyhow, bail, ensure, Context};
25 use data_model::IoBufMut;
26 use io_uring::{
27     cqueue::{self, buffer_select},
28     opcode, squeue,
29     types::{Fd, FsyncFlags, SubmitArgs, Timespec},
30     Builder, IoUring, Probe,
31 };
32 use once_cell::sync::{Lazy, OnceCell};
33 use slab::Slab;
34 use sys_util::{
35     error, warn, AsRawDescriptor, EventFd, FromRawDescriptor, LayoutAllocation, SafeDescriptor,
36 };
37 use thiserror::Error as ThisError;
38 
39 use super::cmsg::*;
40 use crate::{AsIoBufs, OwnedIoBuf};
41 
42 // For now all buffers live in the same buffer group.
43 const BUFFER_GROUP: u16 = 0;
44 // The top 8 bits of the buffer id encode the index of the LayoutAllocation and the bottom 8 bits
45 // encode the index of the buffer within that allocation.
46 const ALLOC_IDX_SHIFT: usize = 8;
47 const BUFFERS_PER_ALLOC: u16 = 32;
48 const BUFFER_IDX_MASK: u16 = (1 << ALLOC_IDX_SHIFT) - 1;
49 
50 // Number of entries in the ring.
51 const NUM_ENTRIES: u32 = 256;
52 
53 // The user_data for the waker. Since our user_data is based on the index in a Slab we'll run out of
54 // memory well before a real operation gets usize::MAX as the index.
55 const WAKER_DATA: usize = usize::MAX;
56 
57 // The global IoUring instance. Each thread-local IoUring shares its kernel backend with this
58 // instance.
59 static GLOBAL_URING: OnceCell<IoUring> = OnceCell::new();
60 static URING_STATUS: Lazy<UringStatus> = Lazy::new(|| {
61     let mut utsname = MaybeUninit::zeroed();
62 
63     // Safe because this will only modify `utsname` and we check the return value.
64     let res = unsafe { libc::uname(utsname.as_mut_ptr()) };
65     if res < 0 {
66         return UringStatus::Disabled;
67     }
68 
69     // Safe because the kernel has initialized `utsname`.
70     let utsname = unsafe { utsname.assume_init() };
71 
72     // Safe because the pointer is valid and the kernel guarantees that this is a valid C string.
73     let release = unsafe { CStr::from_ptr(utsname.release.as_ptr()) };
74 
75     let mut components = match release.to_str().map(|r| r.split('.').map(str::parse)) {
76         Ok(c) => c,
77         Err(_) => return UringStatus::Disabled,
78     };
79 
80     // Kernels older than 5.10 either didn't support io_uring or had bugs in the implementation.
81     match (components.next(), components.next()) {
82         (Some(Ok(major)), Some(Ok(minor))) if (major, minor) >= (5, 10) => {
83             // The kernel version is new enough so check if we can actually make a uring context.
84             if probe_uring().is_ok() {
85                 UringStatus::Enabled(major, minor)
86             } else {
87                 UringStatus::Disabled
88             }
89         }
90         _ => UringStatus::Disabled,
91     }
92 });
93 static EXT_ARG_SUPPORTED: Lazy<bool> = Lazy::new(
94     || matches!(&*URING_STATUS, UringStatus::Enabled(major, minor) if (*major, *minor) >= (5, 11)),
95 );
96 
97 #[derive(Debug)]
98 enum UringStatus {
99     Enabled(usize, usize),
100     Disabled,
101 }
102 
103 thread_local! (static THREAD_STATE: OnceCell<Rc<RefCell<State>>> = OnceCell::new());
new_state() -> anyhow::Result<Rc<RefCell<State>>>104 fn new_state() -> anyhow::Result<Rc<RefCell<State>>> {
105     State::new().map(RefCell::new).map(Rc::new)
106 }
107 
with_state<F, R>(f: F) -> anyhow::Result<R> where F: FnOnce(&mut State) -> anyhow::Result<R>,108 fn with_state<F, R>(f: F) -> anyhow::Result<R>
109 where
110     F: FnOnce(&mut State) -> anyhow::Result<R>,
111 {
112     THREAD_STATE.with(|thread_state| {
113         let state = thread_state.get_or_try_init(new_state)?;
114         f(&mut state.borrow_mut())
115     })
116 }
117 
clone_state() -> anyhow::Result<Rc<RefCell<State>>>118 fn clone_state() -> anyhow::Result<Rc<RefCell<State>>> {
119     THREAD_STATE.with(|thread_state| thread_state.get_or_try_init(new_state).map(Rc::clone))
120 }
121 
122 #[derive(Debug, ThisError)]
123 enum ErrorContext {
124     #[error("`io_uring_enter` failed")]
125     EnterFailed,
126     #[error("failed to return buffer to kernel")]
127     ReturnBuffer,
128     #[error("`SubmissionQueue` full")]
129     SubmissionQueueFull,
130 }
131 
probe_uring() -> anyhow::Result<()>132 fn probe_uring() -> anyhow::Result<()> {
133     const REQUIRED_OPS: &[u8] = &[
134         opcode::Accept::CODE,
135         opcode::AsyncCancel::CODE,
136         opcode::Connect::CODE,
137         opcode::Fallocate::CODE,
138         opcode::Fsync::CODE,
139         opcode::PollAdd::CODE,
140         opcode::ProvideBuffers::CODE,
141         opcode::Read::CODE,
142         opcode::Readv::CODE,
143         opcode::RecvMsg::CODE,
144         opcode::SendMsg::CODE,
145         opcode::Write::CODE,
146         opcode::Writev::CODE,
147     ];
148     let uring = IoUring::new(8)?;
149     let mut probe = Probe::new();
150     uring.submitter().register_probe(&mut probe)?;
151     if REQUIRED_OPS
152         .iter()
153         .all(|&opcode| probe.is_supported(opcode))
154     {
155         Ok(())
156     } else {
157         bail!("Not all required uring operations supported")
158     }
159 }
160 
use_uring() -> bool161 pub fn use_uring() -> bool {
162     match &*URING_STATUS {
163         UringStatus::Enabled(_, _) => true,
164         UringStatus::Disabled => false,
165     }
166 }
167 
168 struct State {
169     uring: IoUring,
170     waker: Waker,
171     ops: Slab<OpStatus>,
172     buffers: [LayoutAllocation; 5],
173 }
174 
175 impl State {
new() -> anyhow::Result<State>176     fn new() -> anyhow::Result<State> {
177         let global_uring = GLOBAL_URING.get_or_try_init(|| IoUring::new(NUM_ENTRIES))?;
178 
179         // The `setup_attach_wq` call here ensures that each thread shares the same backend in the
180         // kernel but has its own separate completion and submission queues, avoiding the need to do
181         // expensive synchronization when touching those queues in userspace.
182         let uring = Builder::default()
183             .setup_attach_wq(global_uring.as_raw_fd())
184             .build(NUM_ENTRIES)?;
185         let waker = Waker::new()?;
186         let ops = Slab::new();
187         let buffers = [
188             new_buffer_allocation(64),
189             new_buffer_allocation(128),
190             new_buffer_allocation(256),
191             new_buffer_allocation(512),
192             new_buffer_allocation(1024),
193         ];
194         let mut state = State {
195             uring,
196             waker,
197             ops,
198             buffers,
199         };
200 
201         for (idx, alloc) in state.buffers.iter().enumerate() {
202             let layout = alloc.layout();
203             debug_assert_eq!(layout.size(), layout.align() * BUFFERS_PER_ALLOC as usize);
204 
205             // We can't use `State::provide_buffers` directly here because `state` is already
206             // borrowed by the for loop.
207             let entry = opcode::ProvideBuffers::new(
208                 alloc.as_ptr(),
209                 layout.align() as i32,
210                 BUFFERS_PER_ALLOC,
211                 BUFFER_GROUP,
212                 pack_buffer_id(idx, 0),
213             )
214             .build()
215             .user_data(idx as u64);
216 
217             // Safety: The allocation is valid for `layout.align() * BUFFERS_PER_ALLOC` bytes of
218             // memory and is valid for the lifetime of the `IoUring` because it lives in the same
219             // struct.
220             unsafe { state.uring.submission().push(&entry) }
221                 .context("failed to submit `ProvideBuffers` operation")?;
222         }
223 
224         // Wait for all the `ProvideBuffers` operations to finish.
225         let count = state
226             .uring
227             .submit_and_wait(state.buffers.len())
228             .context(ErrorContext::EnterFailed)?;
229         debug_assert_eq!(count, state.buffers.len());
230 
231         for entry in state.uring.completion() {
232             if entry.result() < 0 {
233                 return Err(io::Error::from_raw_os_error(-entry.result()))
234                     .context("failed to provide buffers to io_uring");
235             }
236         }
237 
238         // Now start the waker that other threads can use to break us out of an `io_uring_enter`
239         // syscall.
240         state.submit_waker()?;
241 
242         Ok(state)
243     }
244 
getevents(&mut self) -> anyhow::Result<()>245     fn getevents(&mut self) -> anyhow::Result<()> {
246         let (submitter, squeue, _) = self.uring.split();
247         let to_submit = squeue.len();
248         let min_complete = 0;
249 
250         // This flag should really be provided by the `io_uring` crate directly.
251         const IORING_ENTER_GETEVENTS: u32 = 1 << 0;
252 
253         // We need to manually call `Submitter::enter` here because `submit_and_wait` will only add
254         // the `IORING_ENTER_GETEVENTS` flag when `want > 0`.
255         // Safety: the kernel will only ready `to_submit` entries from the submission queue,
256         // which have all been initialized.
257         unsafe {
258             submitter.enter::<libc::sigset_t>(
259                 to_submit as u32,
260                 min_complete,
261                 IORING_ENTER_GETEVENTS,
262                 None,
263             )
264         }
265         .map(drop)
266         .context(ErrorContext::EnterFailed)
267     }
268 
submit_timer(&mut self, ts: Box<Timespec>) -> anyhow::Result<()>269     fn submit_timer(&mut self, ts: Box<Timespec>) -> anyhow::Result<()> {
270         let slot = self.ops.vacant_entry();
271         let entry = opcode::Timeout::new(&*ts)
272             .build()
273             .user_data(slot.key() as u64);
274 
275         slot.insert(OpStatus::System(ts));
276 
277         // Safety: the entry is valid and we can guarantee that the Timespec will live for the
278         // lifetime of the operation.
279         unsafe { self.submit_entry(&entry) }
280     }
281 
wait(&mut self, timeout: Option<Duration>) -> anyhow::Result<()>282     fn wait(&mut self, timeout: Option<Duration>) -> anyhow::Result<()> {
283         if let Some(timeout) = timeout {
284             if timeout > Duration::from_secs(0) {
285                 let ts = Timespec::new()
286                     .sec(timeout.as_secs())
287                     .nsec(timeout.subsec_nanos());
288                 if *EXT_ARG_SUPPORTED {
289                     let args = SubmitArgs::new().timespec(&ts);
290                     self.uring
291                         .submitter()
292                         .submit_with_args(1, &args)
293                         .map(drop)
294                         .context(ErrorContext::EnterFailed)
295                 } else {
296                     // Since `IORING_ENTER_EXT_ARG` is not supported we need to add a `Timeout`
297                     // operation and then do a regular wait.
298                     self.submit_timer(Box::new(ts))?;
299                     self.uring
300                         .submit_and_wait(1)
301                         .map(drop)
302                         .context(ErrorContext::EnterFailed)
303                 }
304             } else {
305                 // A zero timeout means we should submit new operations and fetch any completed
306                 // operations without blocking.
307                 self.getevents()
308             }
309         } else {
310             self.uring
311                 .submit_and_wait(1)
312                 .map(drop)
313                 .context(ErrorContext::EnterFailed)
314         }
315     }
316 
317     // Dispatches all completed IO operations. Returns true if one of the completed operations was the
318     // thread waker.
dispatch(&mut self) -> anyhow::Result<()>319     fn dispatch(&mut self) -> anyhow::Result<()> {
320         let mut waker_entry = None;
321         let mut needs_cleanup = Vec::new();
322         for entry in self.uring.completion() {
323             let idx = entry.user_data() as usize;
324             if idx == WAKER_DATA {
325                 waker_entry = Some(entry);
326                 continue;
327             }
328             let status = replace(&mut self.ops[idx], OpStatus::Ready(entry));
329             match status {
330                 OpStatus::New(_) => {
331                     panic!("Received completion for operation that has not been started")
332                 }
333                 OpStatus::Waiting(w) => w.wake(),
334                 OpStatus::Ready(_) => panic!("Received completion for finished operation"),
335                 OpStatus::Canceled(cleanup, _) => {
336                     let entry = if let OpStatus::Ready(entry) = self.ops.remove(idx) {
337                         entry
338                     } else {
339                         panic!();
340                     };
341                     if let Some(c) = cleanup {
342                         needs_cleanup.push((c, entry));
343                     }
344                 }
345                 OpStatus::System(_) => drop(self.ops.remove(idx)),
346                 OpStatus::Processing | OpStatus::Finished => {
347                     panic!("Unexpected state for `OpStatus`")
348                 }
349             }
350         }
351 
352         if !needs_cleanup.is_empty() || waker_entry.is_some() {
353             // When there is a completion queue overflow, we can end up in an infinite loop:
354             // submit_entry() -> cq_overflow() -> dispatch() -> provide_buffers() / submit_waker()
355             // -> submit_entry(). Now that we've drained the completion queue, submit any pending
356             // operations in the submission queue to break the loop.
357             if self.uring.submission().cq_overflow() {
358                 self.uring.submit()?;
359             }
360         }
361 
362         if let Some(entry) = waker_entry {
363             // We were woken up so return the buffer to the kernel and resubmit the waker.
364             let SelectedBuffer { ptr, len, cap, bid } = self
365                 .get_selected_buffer(entry)
366                 .context("failed to read from waker")?;
367             debug_assert_eq!(len, size_of::<u64>());
368 
369             // Safety: this was a buffer that we previously provided so we know that it is valid and
370             // lives at least as long as the `IoUring`.
371             unsafe { self.provide_buffers(ptr, cap as i32, 1, BUFFER_GROUP, bid) }
372                 .context(ErrorContext::ReturnBuffer)?;
373 
374             self.submit_waker()?;
375         }
376 
377         for (cleanup, entry) in needs_cleanup {
378             cleanup(self, entry);
379         }
380 
381         Ok(())
382     }
383 
384     // Safety: This function has the same safety requirements as `SubmissionQueue::push`, namely that
385     // the parameters of `entry` are valid and will be valid for the entire duration of the operation.
submit_entry(&mut self, entry: &squeue::Entry) -> anyhow::Result<()>386     unsafe fn submit_entry(&mut self, entry: &squeue::Entry) -> anyhow::Result<()> {
387         if self.uring.submission().push(entry).is_err() {
388             if self.uring.submission().cq_overflow() {
389                 self.dispatch()
390                     .context("failed to dispatch completed ops during cqueue overflow")?;
391             }
392             self.uring.submit().context(ErrorContext::EnterFailed)?;
393             self.uring
394                 .submission()
395                 .push(entry)
396                 .map_err(|_| io::Error::from_raw_os_error(libc::EBUSY))
397                 .context(ErrorContext::SubmissionQueueFull)
398         } else {
399             Ok(())
400         }
401     }
402 
submit_waker(&mut self) -> anyhow::Result<()>403     fn submit_waker(&mut self) -> anyhow::Result<()> {
404         let entry = opcode::Read::new(
405             Fd(self.waker.0.as_raw_fd()),
406             ptr::null_mut(),
407             size_of::<u64>() as u32,
408         )
409         .buf_group(BUFFER_GROUP)
410         .build()
411         .user_data(WAKER_DATA as u64)
412         .flags(squeue::Flags::BUFFER_SELECT);
413 
414         // Safety: the entry is valid and doesn't reference any memory.
415         unsafe { self.submit_entry(&entry) }
416     }
417 
418     // Safety: `buffer` must be a valid pointer to `len * nbufs` bytes of memory and must live at
419     // least as long as `self`.
provide_buffers( &mut self, buffer: *mut u8, len: i32, nbufs: u16, bgid: u16, bid: u16, ) -> anyhow::Result<()>420     unsafe fn provide_buffers(
421         &mut self,
422         buffer: *mut u8,
423         len: i32,
424         nbufs: u16,
425         bgid: u16,
426         bid: u16,
427     ) -> anyhow::Result<()> {
428         let slot = self.ops.vacant_entry();
429         let idx = slot.key();
430         let entry = opcode::ProvideBuffers::new(buffer, len, nbufs, bgid, bid)
431             .build()
432             .user_data(idx as u64);
433 
434         slot.insert(OpStatus::System(Box::new(())));
435 
436         // Safety: `buffer` is a valid pointer to `len * nbufs` bytes of memory and will be valid
437         // for the lifetime of the `IoUring` because it lives at least as long as `self`.
438         self.submit_entry(&entry)
439     }
440 
441     // Returns the buffer selected by the kernel for `entry`. Panics if no buffer was selected by
442     // the kernel.
get_selected_buffer(&self, entry: cqueue::Entry) -> anyhow::Result<SelectedBuffer>443     fn get_selected_buffer(&self, entry: cqueue::Entry) -> anyhow::Result<SelectedBuffer> {
444         let len = entry_to_result(entry.clone())?;
445 
446         let bid = buffer_select(entry.flags()).expect("No buffer selected");
447         let (alloc_idx, buffer_idx) = unpack_buffer_id(bid);
448         let alloc = &self.buffers[alloc_idx];
449         let layout = alloc.layout();
450         let cap = layout.align();
451 
452         debug_assert!(len <= cap);
453         debug_assert!(buffer_idx * layout.align() <= layout.size() - len);
454 
455         // Safety: the allocation is valid for at least `buffer_idx * layout.align()` bytes of
456         // memory.
457         let ptr = unsafe { alloc.as_ptr::<u8>().add(buffer_idx * layout.align()) };
458         Ok(SelectedBuffer { ptr, len, cap, bid })
459     }
460 
461     // Copies data from the kernel-selected buffer into the user-provided buffer and returns the
462     // selected buffer to the kernel. Panics if no buffer was selected by the kernel.
copy_from_selected_buffer( &mut self, entry: cqueue::Entry, buf: &mut [u8], ) -> anyhow::Result<usize>463     fn copy_from_selected_buffer(
464         &mut self,
465         entry: cqueue::Entry,
466         buf: &mut [u8],
467     ) -> anyhow::Result<usize> {
468         let SelectedBuffer { ptr, len, cap, bid } = self.get_selected_buffer(entry)?;
469         let count = min(len, buf.len());
470 
471         // Safety: both pointers point to at least `count` bytes of allocated memory.
472         unsafe { ptr::copy_nonoverlapping(ptr, buf.as_mut_ptr(), count) };
473 
474         // Now that we've copied the data out we need to return the buffer to the kernel.
475         // Safety: this is a buffer that was previously registered with the kernel and the caller
476         // that registered it was required to guarantee that it lives as long as the `IoUring`.
477         // We're reusing that guarantee here.
478         unsafe { self.provide_buffers(ptr, cap as i32, 1, BUFFER_GROUP, bid) }
479             .context(ErrorContext::ReturnBuffer)?;
480 
481         Ok(count)
482     }
483 
cancel_op(&mut self, idx: usize) -> anyhow::Result<()>484     fn cancel_op(&mut self, idx: usize) -> anyhow::Result<()> {
485         // We're still waiting for the underlying IO to complete so try to cancel it if we can.
486         let slot = self.ops.vacant_entry();
487         let cancel = opcode::AsyncCancel::new(idx as u64)
488             .build()
489             .user_data(slot.key() as u64);
490 
491         slot.insert(OpStatus::System(Box::new(())));
492 
493         // Safety: The entry is valid and doesn't reference any memory.
494         unsafe { self.submit_entry(&cancel) }.context("failed to submit async cancellation")
495     }
496 
497     // TODO: Do we actually need any of this? Once the IoUring is dropped, the fd should be closed
498     // so it doesn't seem necessary for us to actually drain it. It would be weird if the kernel
499     // kept around references to memory once the uring fd is gone.
500     // fn shutdown(&mut self, deadline: Instant) -> anyhow::Result<()> {
501     //     // Every async operation owns a reference to the `State` and either removes itself from
502     //     // `self.ops` or changes its status to `Canceled` when it is dropped so `self.ops` shouldn't
503     //     // contain anything other than canceled and system operations.
504     //     let pending = self
505     //         .ops
506     //         .iter_mut()
507     //         .filter_map(|(idx, op)| match replace(op, OpStatus::Processing) {
508     //             OpStatus::System(data) => {
509     //                 *op = OpStatus::Canceled(data);
510     //                 Some(idx)
511     //             }
512     //             OpStatus::Canceled(data) => {
513     //                 *op = OpStatus::Canceled(data);
514     //                 None
515     //             }
516     //             _ => panic!(
517     //                 "Thread state dropped while there are still non-canceled operations pending"
518     //             ),
519     //         })
520     //         .collect::<Vec<_>>();
521 
522     //     for idx in pending {
523     //         self.cancel_op(idx)?;
524     //     }
525 
526     //     // Wait for all the canceled operations to finish.
527     //     if !self.ops.is_empty() {
528     //         self.wait(
529     //             self.ops.len(),
530     //             Some(deadline.saturating_duration_since(Instant::now())),
531     //         )?;
532     //     }
533     //     self.dispatch()?;
534 
535     //     let ext_arg_supported = *EXT_ARG_SUPPORTED;
536     //     // When `IORING_ENTER_EXT_ARG` is not supported, there may still be a timer op left in
537     //     // `self.ops`.
538     //     if (ext_arg_supported && !self.ops.is_empty()) || (!ext_arg_supported && self.ops.len() > 1)
539     //     {
540     //         return Err(anyhow!(io::Error::from_raw_os_error(libc::ETIMEDOUT))).context(format!(
541     //             "Still waiting for {} operations to finish",
542     //             self.ops.len()
543     //         ));
544     //     }
545 
546     //     // The `Waker` is the last pending operation.
547     //     self.waker.wake().context("failed to wake Waker")?;
548     //     self.wait(1, Some(deadline.saturating_duration_since(Instant::now())))?;
549 
550     //     Ok(())
551     // }
552 }
553 
554 // TODO: Do we actually need this?  See State::shutdown above.
555 // impl Drop for State {
556 //     fn drop(&mut self) {
557 //         // How long we should wait to drain the `IoUring` before giving up.
558 //         const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
559 //         if let Err(e) = self.shutdown(Instant::now() + SHUTDOWN_TIMEOUT) {
560 //             process::abort();
561 //         }
562 //     }
563 // }
564 
565 struct SelectedBuffer {
566     ptr: *mut u8,
567     len: usize,
568     cap: usize,
569     bid: u16,
570 }
571 
new_buffer_allocation(size: usize) -> LayoutAllocation572 fn new_buffer_allocation(size: usize) -> LayoutAllocation {
573     let layout = Layout::from_size_align(size * usize::from(BUFFERS_PER_ALLOC), size)
574         .expect("Invalid layout");
575     LayoutAllocation::uninitialized(layout)
576 }
577 
pack_buffer_id(alloc_idx: usize, buffer_idx: usize) -> u16578 fn pack_buffer_id(alloc_idx: usize, buffer_idx: usize) -> u16 {
579     debug_assert!(alloc_idx << ALLOC_IDX_SHIFT <= u16::MAX as usize);
580     debug_assert_eq!(buffer_idx & usize::from(BUFFER_IDX_MASK), buffer_idx);
581     ((alloc_idx << ALLOC_IDX_SHIFT) | buffer_idx) as u16
582 }
583 
584 // Returns the index of the `LayoutAllocation` and the index of the buffer within that allocation.
unpack_buffer_id(bid: u16) -> (usize, usize)585 fn unpack_buffer_id(bid: u16) -> (usize, usize) {
586     let alloc_idx = (bid >> ALLOC_IDX_SHIFT).into();
587     let buffer_idx = (bid & BUFFER_IDX_MASK).into();
588     (alloc_idx, buffer_idx)
589 }
590 
591 pub struct Waker(EventFd);
592 impl Waker {
new() -> anyhow::Result<Waker>593     fn new() -> anyhow::Result<Waker> {
594         EventFd::new()
595             .map(Waker)
596             .map_err(|e| anyhow!(io::Error::from(e)))
597     }
598 
try_clone(&self) -> anyhow::Result<Waker>599     fn try_clone(&self) -> anyhow::Result<Waker> {
600         self.0
601             .try_clone()
602             .map(Waker)
603             .map_err(|e| anyhow!(io::Error::from(e)))
604     }
605 
wake(&self) -> anyhow::Result<()>606     pub fn wake(&self) -> anyhow::Result<()> {
607         self.0
608             .write(1)
609             .map(drop)
610             .map_err(|e| anyhow!(io::Error::from(e)))
611     }
612 }
613 
new_waker() -> anyhow::Result<Waker>614 pub fn new_waker() -> anyhow::Result<Waker> {
615     with_state(|state| state.waker.try_clone())
616 }
617 
618 // Wait for more events.
wait(timeout: Option<Duration>) -> anyhow::Result<()>619 pub fn wait(timeout: Option<Duration>) -> anyhow::Result<()> {
620     with_state(|state| state.wait(timeout))
621 }
622 
623 // Wake up any tasks that are ready.
dispatch() -> anyhow::Result<()>624 pub fn dispatch() -> anyhow::Result<()> {
625     with_state(|state| state.dispatch())
626 }
627 
628 enum OpStatus {
629     New(squeue::Entry),
630     Waiting(task::Waker),
631     Ready(cqueue::Entry),
632     Canceled(Option<fn(&mut State, cqueue::Entry)>, Box<dyn Any>),
633     System(Box<dyn Any>),
634     Processing,
635     Finished,
636 }
637 
638 struct Op<'a, B: Unpin + 'static> {
639     state: Rc<RefCell<State>>,
640     desc: &'a Arc<SafeDescriptor>,
641     cleanup: Option<fn(&mut State, cqueue::Entry)>,
642     buf: Option<B>,
643     idx: usize,
644 }
645 
646 impl<'a, B: Unpin + 'static> Future for Op<'a, B> {
647     type Output = (anyhow::Result<cqueue::Entry>, Option<B>);
648 
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>649     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
650         let mut state = self.state.borrow_mut();
651         let status = replace(&mut state.ops[self.idx], OpStatus::Processing);
652 
653         match status {
654             // We don't want to submit the operation to the kernel until this future is polled
655             // because polling requires pinning and pinning guarantees that our drop impl will be
656             // called, which is necessary to ensure that resources shared with the kernel will live
657             // for the lifetime of the operation.
658             OpStatus::New(entry) => {
659                 // Safety: The parameters in `Entry` are owned by `Op` and will be transferred to
660                 // the thread state if this `Op` is dropped, guaranteeing that they are valid for
661                 // the lifetime of the operation. Also see above for the drop guarantee.
662                 let res = unsafe { state.submit_entry(&entry) };
663 
664                 if let Err(e) = res {
665                     drop(state);
666                     return Poll::Ready((Err(e), self.buf.take()));
667                 }
668 
669                 state.ops[self.idx] = OpStatus::Waiting(cx.waker().clone());
670             }
671             OpStatus::Waiting(w) if !w.will_wake(cx.waker()) => {
672                 state.ops[self.idx] = OpStatus::Waiting(cx.waker().clone())
673             }
674             // If `cx.waker()` and the currently stored waker are the same then no need to do
675             // anything.
676             OpStatus::Waiting(w) => state.ops[self.idx] = OpStatus::Waiting(w),
677             OpStatus::Ready(entry) => {
678                 state.ops[self.idx] = OpStatus::Finished;
679                 drop(state);
680 
681                 let buf = self.buf.take();
682                 return Poll::Ready((Ok(entry), buf));
683             }
684             OpStatus::Canceled(_, _) => panic!("`Op` polled after drop"),
685             OpStatus::System(_) | OpStatus::Processing => panic!("Unexpected state for `OpStatus`"),
686             OpStatus::Finished => panic!("`Op` polled after returning `Poll::Ready`"),
687         }
688 
689         Poll::Pending
690     }
691 }
692 
693 impl<'a, B: Unpin + 'static> Drop for Op<'a, B> {
drop(&mut self)694     fn drop(&mut self) {
695         let mut state = self.state.borrow_mut();
696         let status = replace(&mut state.ops[self.idx], OpStatus::Processing);
697 
698         if let OpStatus::Waiting(_) = status {
699             // If we're still waiting for the IO to finish then we cannot free the resources until
700             // the operation is complete.
701             if let Err(e) = state.cancel_op(self.idx) {
702                 warn!("Failed to cancel dropped operation: {:#}", e);
703             }
704 
705             // Now take ownership of any resources associated with the canceled operation.
706             state.ops[self.idx] = OpStatus::Canceled(
707                 self.cleanup.take(),
708                 Box::new((self.desc.clone(), self.buf.take())),
709             )
710         } else {
711             // We have not shared any resources with the kernel so we can clean up the `OpStatus` now.
712             state.ops.remove(self.idx);
713         }
714     }
715 }
716 
start_op<B: Unpin + 'static>( state: Rc<RefCell<State>>, entry: squeue::Entry, desc: &Arc<SafeDescriptor>, cleanup: Option<fn(&mut State, cqueue::Entry)>, buf: Option<B>, ) -> Op<B>717 fn start_op<B: Unpin + 'static>(
718     state: Rc<RefCell<State>>,
719     entry: squeue::Entry,
720     desc: &Arc<SafeDescriptor>,
721     cleanup: Option<fn(&mut State, cqueue::Entry)>,
722     buf: Option<B>,
723 ) -> Op<B> {
724     let idx = {
725         let mut state = state.borrow_mut();
726         let slot = state.ops.vacant_entry();
727         let idx = slot.key();
728         slot.insert(OpStatus::New(entry.user_data(idx as u64)));
729         idx
730     };
731     Op {
732         state,
733         desc,
734         cleanup,
735         buf,
736         idx,
737     }
738 }
739 
entry_to_result(entry: cqueue::Entry) -> anyhow::Result<usize>740 fn entry_to_result(entry: cqueue::Entry) -> anyhow::Result<usize> {
741     let res = entry.result();
742     if res < 0 {
743         Err(anyhow!(io::Error::from_raw_os_error(-res)))
744     } else {
745         Ok(res as usize)
746     }
747 }
748 
return_selected_buffer(state: &mut State, entry: cqueue::Entry)749 fn return_selected_buffer(state: &mut State, entry: cqueue::Entry) {
750     let inner = || {
751         let SelectedBuffer {
752             ptr,
753             len: _,
754             cap,
755             bid,
756         } = state.get_selected_buffer(entry)?;
757 
758         // Safety: we are returning a buffer that was previously provided to the kernel so we know
759         // it must live as long as the `IoUring`.
760         unsafe { state.provide_buffers(ptr, cap as i32, 1, BUFFER_GROUP, bid) }
761     };
762 
763     if let Err(e) = inner() {
764         warn!(
765             "Failed to return selected buffer to kernel; buffer will be leaked: {:#}",
766             e
767         );
768     }
769 }
770 
read( desc: &Arc<SafeDescriptor>, buf: &mut [u8], offset: Option<u64>, ) -> anyhow::Result<usize>771 pub async fn read(
772     desc: &Arc<SafeDescriptor>,
773     buf: &mut [u8],
774     offset: Option<u64>,
775 ) -> anyhow::Result<usize> {
776     let len = buf
777         .len()
778         .try_into()
779         .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
780     let mut read =
781         opcode::Read::new(Fd(desc.as_raw_fd()), ptr::null_mut(), len).buf_group(BUFFER_GROUP);
782     if let Some(offset) = offset {
783         read = read.offset(offset as libc::off64_t);
784     }
785     let entry = read.build().flags(squeue::Flags::BUFFER_SELECT);
786     let state = clone_state()?;
787     let (res, _) = start_op(state, entry, desc, Some(return_selected_buffer), None::<()>).await;
788     with_state(|state| state.copy_from_selected_buffer(res?, buf))
789 }
790 
read_iobuf<B: AsIoBufs + Unpin + 'static>( desc: &Arc<SafeDescriptor>, mut buf: B, offset: Option<u64>, ) -> (anyhow::Result<usize>, B)791 pub async fn read_iobuf<B: AsIoBufs + Unpin + 'static>(
792     desc: &Arc<SafeDescriptor>,
793     mut buf: B,
794     offset: Option<u64>,
795 ) -> (anyhow::Result<usize>, B) {
796     let iobufs = IoBufMut::as_iobufs(buf.as_iobufs());
797     let mut readv = opcode::Readv::new(Fd(desc.as_raw_fd()), iobufs.as_ptr(), iobufs.len() as u32);
798     if let Some(off) = offset {
799         readv = readv.offset(off as libc::off64_t);
800     }
801 
802     let state = match clone_state() {
803         Ok(s) => s,
804         Err(e) => return (Err(e), buf),
805     };
806     let (res, buf) = start_op(state, readv.build(), desc, None, Some(buf)).await;
807     (res.and_then(entry_to_result), buf.unwrap())
808 }
809 
write( desc: &Arc<SafeDescriptor>, buf: &[u8], offset: Option<u64>, ) -> anyhow::Result<usize>810 pub async fn write(
811     desc: &Arc<SafeDescriptor>,
812     buf: &[u8],
813     offset: Option<u64>,
814 ) -> anyhow::Result<usize> {
815     // TODO: Maybe we should do something smarter here with a shared buffer pool like we do for
816     // `read`.
817     let (res, _) = write_iobuf(desc, OwnedIoBuf::new(buf.to_vec()), offset).await;
818     res
819 }
820 
write_iobuf<B: AsIoBufs + Unpin + 'static>( desc: &Arc<SafeDescriptor>, mut buf: B, offset: Option<u64>, ) -> (anyhow::Result<usize>, B)821 pub async fn write_iobuf<B: AsIoBufs + Unpin + 'static>(
822     desc: &Arc<SafeDescriptor>,
823     mut buf: B,
824     offset: Option<u64>,
825 ) -> (anyhow::Result<usize>, B) {
826     let iobufs = IoBufMut::as_iobufs(buf.as_iobufs());
827     let mut writev =
828         opcode::Writev::new(Fd(desc.as_raw_fd()), iobufs.as_ptr(), iobufs.len() as u32);
829     if let Some(off) = offset {
830         writev = writev.offset(off as libc::off64_t);
831     }
832 
833     let state = match clone_state() {
834         Ok(s) => s,
835         Err(e) => return (Err(e), buf),
836     };
837     let (res, buf) = start_op(state, writev.build(), desc, None, Some(buf)).await;
838     (res.and_then(entry_to_result), buf.unwrap())
839 }
840 
fallocate( desc: &Arc<SafeDescriptor>, file_offset: u64, len: u64, mode: u32, ) -> anyhow::Result<()>841 pub async fn fallocate(
842     desc: &Arc<SafeDescriptor>,
843     file_offset: u64,
844     len: u64,
845     mode: u32,
846 ) -> anyhow::Result<()> {
847     let entry = opcode::Fallocate::new(Fd(desc.as_raw_fd()), len as libc::off64_t)
848         .offset(file_offset as libc::off64_t)
849         .mode(mode as libc::c_int)
850         .build();
851     let state = clone_state()?;
852     let (res, _) = start_op(state, entry, desc, None, None::<()>).await;
853     res.and_then(entry_to_result).map(drop)
854 }
855 
ftruncate(desc: &Arc<SafeDescriptor>, len: u64) -> anyhow::Result<()>856 pub async fn ftruncate(desc: &Arc<SafeDescriptor>, len: u64) -> anyhow::Result<()> {
857     let ret = unsafe { libc::ftruncate64(desc.as_raw_descriptor(), len as libc::off64_t) };
858 
859     if ret == 0 {
860         Ok(())
861     } else {
862         Err(io::Error::last_os_error().into())
863     }
864 }
865 
stat(desc: &Arc<SafeDescriptor>) -> anyhow::Result<libc::stat64>866 pub async fn stat(desc: &Arc<SafeDescriptor>) -> anyhow::Result<libc::stat64> {
867     // TODO: use opcode::Statx
868     let mut st = MaybeUninit::zeroed();
869 
870     // Safe because this will only modify `st` and we check the return value.
871     let ret = unsafe { libc::fstat64(desc.as_raw_descriptor(), st.as_mut_ptr()) };
872 
873     if ret == 0 {
874         // Safe because the kernel guarantees that `st` is now initialized.
875         Ok(unsafe { st.assume_init() })
876     } else {
877         Err(io::Error::last_os_error().into())
878     }
879 }
880 
fsync(desc: &Arc<SafeDescriptor>, datasync: bool) -> anyhow::Result<()>881 pub async fn fsync(desc: &Arc<SafeDescriptor>, datasync: bool) -> anyhow::Result<()> {
882     let mut entry = opcode::Fsync::new(Fd(desc.as_raw_fd()));
883     if datasync {
884         entry = entry.flags(FsyncFlags::DATASYNC);
885     }
886     let state = clone_state()?;
887     let (res, _) = start_op(state, entry.build(), desc, None, None::<()>).await;
888     res.and_then(entry_to_result).map(drop)
889 }
890 
connect( desc: &Arc<SafeDescriptor>, addr: libc::sockaddr_un, len: libc::socklen_t, ) -> anyhow::Result<()>891 pub async fn connect(
892     desc: &Arc<SafeDescriptor>,
893     addr: libc::sockaddr_un,
894     len: libc::socklen_t,
895 ) -> anyhow::Result<()> {
896     ensure!(
897         len <= size_of::<libc::sockaddr_un>() as libc::socklen_t,
898         io::Error::from_raw_os_error(libc::EINVAL)
899     );
900     // TODO: Figure out a way to get rid of this box.
901     let addr = Box::new(addr);
902 
903     let entry = opcode::Connect::new(
904         Fd(desc.as_raw_fd()),
905         &*addr as *const libc::sockaddr_un as *const _,
906         len,
907     )
908     .build();
909     let state = clone_state()?;
910     let (res, _) = start_op(state, entry, desc, None, Some(addr)).await;
911 
912     res.and_then(entry_to_result).map(drop)
913 }
914 
next_packet_size(desc: &Arc<SafeDescriptor>) -> anyhow::Result<usize>915 pub async fn next_packet_size(desc: &Arc<SafeDescriptor>) -> anyhow::Result<usize> {
916     // For some reason, this always returns 0 under uring so use epoll-style for now. TODO: Figure
917     // out how we can go back to using uring.
918     #[cfg(not(debug_assertions))]
919     let buf = ptr::null_mut();
920     // Work around for qemu's syscall translation which will reject null pointers in recvfrom.
921     // This only matters for running the unit tests for a non-native architecture. See the
922     // upstream thread for the qemu fix:
923     // https://lists.nongnu.org/archive/html/qemu-devel/2021-03/msg09027.html
924     #[cfg(debug_assertions)]
925     let buf = ptr::NonNull::dangling().as_ptr();
926 
927     loop {
928         // Safe because this will not modify any memory and we check the return value.
929         let ret = unsafe {
930             libc::recvfrom(
931                 desc.as_raw_descriptor(),
932                 buf,
933                 0,
934                 libc::MSG_TRUNC | libc::MSG_PEEK | libc::MSG_DONTWAIT,
935                 ptr::null_mut(),
936                 ptr::null_mut(),
937             )
938         };
939 
940         if ret >= 0 {
941             return Ok(ret as usize);
942         }
943 
944         match sys_util::Error::last() {
945             e if e.errno() == libc::EWOULDBLOCK || e.errno() == libc::EAGAIN => {
946                 wait_readable(desc).await?;
947             }
948             e => bail!(io::Error::from(e)),
949         }
950     }
951 }
952 
sendmsg( desc: &Arc<SafeDescriptor>, buf: &[u8], fds: &[RawFd], ) -> anyhow::Result<usize>953 pub async fn sendmsg(
954     desc: &Arc<SafeDescriptor>,
955     buf: &[u8],
956     fds: &[RawFd],
957 ) -> anyhow::Result<usize> {
958     // TODO: Consider using a shared buffer pool.
959     let (res, _) = send_iobuf_with_fds(desc, OwnedIoBuf::new(buf.to_vec()), fds).await;
960     res
961 }
962 
recvmsg( desc: &Arc<SafeDescriptor>, buf: &mut [u8], fds: &mut [RawFd], ) -> anyhow::Result<(usize, usize)>963 pub async fn recvmsg(
964     desc: &Arc<SafeDescriptor>,
965     buf: &mut [u8],
966     fds: &mut [RawFd],
967 ) -> anyhow::Result<(usize, usize)> {
968     // TODO: The io_uring crate doesn't support using BUFFER_SELECT for recvmsg even though it's
969     // supported by the kernel.
970     let (res, src) = recv_iobuf_with_fds(desc, OwnedIoBuf::new(vec![0u8; buf.len()]), fds).await;
971     let (buflen, fd_count) = res?;
972     let count = min(buflen, buf.len());
973     buf[..count].copy_from_slice(&src[..count]);
974     Ok((count, fd_count))
975 }
976 
send_iobuf_with_fds<B: AsIoBufs + Unpin + 'static>( desc: &Arc<SafeDescriptor>, mut buf: B, fds: &[RawFd], ) -> (anyhow::Result<usize>, B)977 pub async fn send_iobuf_with_fds<B: AsIoBufs + Unpin + 'static>(
978     desc: &Arc<SafeDescriptor>,
979     mut buf: B,
980     fds: &[RawFd],
981 ) -> (anyhow::Result<usize>, B) {
982     let iovs = IoBufMut::as_iobufs(buf.as_iobufs());
983     let mut msg = libc::msghdr {
984         msg_name: ptr::null_mut(),
985         msg_namelen: 0,
986         msg_iov: iovs.as_ptr() as *mut libc::iovec,
987         msg_iovlen: iovs.len(),
988         msg_flags: libc::MSG_NOSIGNAL,
989         msg_control: ptr::null_mut(),
990         msg_controllen: 0,
991     };
992 
993     // `IORING_OP_SENDMSG` internally uses the `__sys_sendmsg_sock` kernel function, which disallows
994     // control messages. In that case we fall back to epoll-style async operations.
995     if !fds.is_empty() {
996         let inner = async {
997             let cmsg_buffer = add_fds_to_message(&mut msg, fds)?;
998 
999             loop {
1000                 // Safe because this doesn't modify any memory and we check the return value.
1001                 let ret = unsafe {
1002                     libc::sendmsg(
1003                         desc.as_raw_descriptor(),
1004                         &msg,
1005                         libc::MSG_NOSIGNAL | libc::MSG_DONTWAIT,
1006                     )
1007                 };
1008 
1009                 if ret >= 0 {
1010                     return Ok(ret as usize);
1011                 }
1012 
1013                 match sys_util::Error::last() {
1014                     e if e.errno() == libc::EWOULDBLOCK || e.errno() == libc::EAGAIN => {
1015                         wait_writable(desc).await?;
1016                     }
1017                     e => return Err(anyhow!(io::Error::from(e))),
1018                 }
1019             }
1020         };
1021         (inner.await, buf)
1022     } else {
1023         let msg = Box::new(msg);
1024         let entry = opcode::SendMsg::new(Fd(desc.as_raw_descriptor()), &*msg).build();
1025         let state = match clone_state() {
1026             Ok(s) => s,
1027             Err(e) => return (Err(e), buf),
1028         };
1029         let (res, data) = start_op(state, entry, desc, None, Some((buf, msg))).await;
1030         (res.and_then(entry_to_result), data.unwrap().0)
1031     }
1032 }
1033 
recv_iobuf_with_fds<B: AsIoBufs + Unpin + 'static>( desc: &Arc<SafeDescriptor>, mut buf: B, fds: &mut [RawFd], ) -> (anyhow::Result<(usize, usize)>, B)1034 pub async fn recv_iobuf_with_fds<B: AsIoBufs + Unpin + 'static>(
1035     desc: &Arc<SafeDescriptor>,
1036     mut buf: B,
1037     fds: &mut [RawFd],
1038 ) -> (anyhow::Result<(usize, usize)>, B) {
1039     let iovs = IoBufMut::as_iobufs(buf.as_iobufs());
1040     // `IORING_OP_RECVMSG` internally uses the `__sys_recvmsg_sock` kernel function, which disallows
1041     // control messages. In that case we fall back to epoll-style async operations.
1042     if !fds.is_empty() {
1043         let inner = async {
1044             let fd_cap = fds
1045                 .len()
1046                 .checked_mul(size_of::<RawFd>())
1047                 .and_then(|l| u32::try_from(l).ok())
1048                 .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidInput))?;
1049             let (cmsg_buffer, cmsg_cap) = allocate_cmsg_buffer(fd_cap)?;
1050             let mut msg = libc::msghdr {
1051                 msg_name: ptr::null_mut(),
1052                 msg_namelen: 0,
1053                 msg_iov: iovs.as_ptr() as *mut libc::iovec,
1054                 msg_iovlen: iovs.len(),
1055                 msg_flags: 0,
1056                 msg_control: cmsg_buffer.as_ptr(),
1057                 msg_controllen: cmsg_cap,
1058             };
1059 
1060             let buflen = loop {
1061                 // Safe because this will only modify `buf` and `cmsg_buffer` and we check the return value.
1062                 let ret = unsafe {
1063                     libc::recvmsg(
1064                         desc.as_raw_descriptor(),
1065                         &mut msg,
1066                         libc::MSG_NOSIGNAL | libc::MSG_DONTWAIT,
1067                     )
1068                 };
1069 
1070                 if ret >= 0 {
1071                     break ret as usize;
1072                 }
1073 
1074                 match sys_util::Error::last() {
1075                     e if e.errno() == libc::EWOULDBLOCK || e.errno() == libc::EAGAIN => {
1076                         wait_readable(desc).await?;
1077                     }
1078                     e => return Err(anyhow!(io::Error::from(e))),
1079                 }
1080             };
1081 
1082             let fd_count = take_fds_from_message(&msg, fds)?;
1083             Ok((buflen, fd_count))
1084         };
1085         (inner.await, buf)
1086     } else {
1087         let mut msg = Box::new(libc::msghdr {
1088             msg_name: ptr::null_mut(),
1089             msg_namelen: 0,
1090             msg_iov: iovs.as_ptr() as *mut libc::iovec,
1091             msg_iovlen: iovs.len(),
1092             msg_flags: libc::MSG_NOSIGNAL,
1093             msg_control: ptr::null_mut(),
1094             msg_controllen: 0,
1095         });
1096 
1097         let entry = opcode::RecvMsg::new(Fd(desc.as_raw_descriptor()), &mut *msg).build();
1098         let state = match clone_state() {
1099             Ok(s) => s,
1100             Err(e) => return (Err(e), buf),
1101         };
1102         let (res, data) = start_op(state, entry, desc, None, Some((buf, msg))).await;
1103         let (buf, msg) = data.unwrap();
1104 
1105         let inner = || {
1106             let buflen = res.and_then(entry_to_result)?;
1107             let fd_count = take_fds_from_message(&msg, fds)?;
1108             Ok((buflen, fd_count))
1109         };
1110         (inner(), buf)
1111     }
1112 }
1113 
accept(desc: &Arc<SafeDescriptor>) -> anyhow::Result<SafeDescriptor>1114 pub async fn accept(desc: &Arc<SafeDescriptor>) -> anyhow::Result<SafeDescriptor> {
1115     let entry = opcode::Accept::new(Fd(desc.as_raw_fd()), ptr::null_mut(), ptr::null_mut())
1116         .flags(libc::SOCK_CLOEXEC)
1117         .build();
1118     let state = clone_state()?;
1119     let (res, _) = start_op(state, entry, desc, None, None::<()>).await;
1120 
1121     // Safe because we own this fd.
1122     res.and_then(entry_to_result)
1123         .map(|fd| unsafe { SafeDescriptor::from_raw_descriptor(fd as _) })
1124 }
1125 
wait_readable(desc: &Arc<SafeDescriptor>) -> anyhow::Result<()>1126 pub async fn wait_readable(desc: &Arc<SafeDescriptor>) -> anyhow::Result<()> {
1127     let entry = opcode::PollAdd::new(Fd(desc.as_raw_fd()), libc::POLLIN as u32).build();
1128     let state = clone_state()?;
1129     let (res, _) = start_op(state, entry, desc, None, None::<()>).await;
1130     res.and_then(entry_to_result).map(drop)
1131 }
1132 
wait_writable(desc: &Arc<SafeDescriptor>) -> anyhow::Result<()>1133 pub async fn wait_writable(desc: &Arc<SafeDescriptor>) -> anyhow::Result<()> {
1134     let entry = opcode::PollAdd::new(Fd(desc.as_raw_fd()), libc::POLLOUT as u32).build();
1135     let state = clone_state()?;
1136     let (res, _) = start_op(state, entry, desc, None, None::<()>).await;
1137     res.and_then(entry_to_result).map(drop)
1138 }
1139 
prepare(_fd: &dyn AsRawDescriptor) -> anyhow::Result<()>1140 pub fn prepare(_fd: &dyn AsRawDescriptor) -> anyhow::Result<()> {
1141     Ok(())
1142 }
1143