1 // Copyright 2021 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 use std::{
6 alloc::Layout,
7 any::Any,
8 cell::RefCell,
9 cmp::min,
10 convert::{TryFrom, TryInto},
11 ffi::CStr,
12 future::Future,
13 io,
14 mem::{replace, size_of, MaybeUninit},
15 os::unix::io::{AsRawFd, RawFd},
16 pin::Pin,
17 ptr,
18 rc::Rc,
19 sync::Arc,
20 task::{self, Poll},
21 time::Duration,
22 };
23
24 use anyhow::{anyhow, bail, ensure, Context};
25 use data_model::IoBufMut;
26 use io_uring::{
27 cqueue::{self, buffer_select},
28 opcode, squeue,
29 types::{Fd, FsyncFlags, SubmitArgs, Timespec},
30 Builder, IoUring, Probe,
31 };
32 use once_cell::sync::{Lazy, OnceCell};
33 use slab::Slab;
34 use sys_util::{
35 error, warn, AsRawDescriptor, EventFd, FromRawDescriptor, LayoutAllocation, SafeDescriptor,
36 };
37 use thiserror::Error as ThisError;
38
39 use super::cmsg::*;
40 use crate::{AsIoBufs, OwnedIoBuf};
41
42 // For now all buffers live in the same buffer group.
43 const BUFFER_GROUP: u16 = 0;
44 // The top 8 bits of the buffer id encode the index of the LayoutAllocation and the bottom 8 bits
45 // encode the index of the buffer within that allocation.
46 const ALLOC_IDX_SHIFT: usize = 8;
47 const BUFFERS_PER_ALLOC: u16 = 32;
48 const BUFFER_IDX_MASK: u16 = (1 << ALLOC_IDX_SHIFT) - 1;
49
50 // Number of entries in the ring.
51 const NUM_ENTRIES: u32 = 256;
52
53 // The user_data for the waker. Since our user_data is based on the index in a Slab we'll run out of
54 // memory well before a real operation gets usize::MAX as the index.
55 const WAKER_DATA: usize = usize::MAX;
56
57 // The global IoUring instance. Each thread-local IoUring shares its kernel backend with this
58 // instance.
59 static GLOBAL_URING: OnceCell<IoUring> = OnceCell::new();
60 static URING_STATUS: Lazy<UringStatus> = Lazy::new(|| {
61 let mut utsname = MaybeUninit::zeroed();
62
63 // Safe because this will only modify `utsname` and we check the return value.
64 let res = unsafe { libc::uname(utsname.as_mut_ptr()) };
65 if res < 0 {
66 return UringStatus::Disabled;
67 }
68
69 // Safe because the kernel has initialized `utsname`.
70 let utsname = unsafe { utsname.assume_init() };
71
72 // Safe because the pointer is valid and the kernel guarantees that this is a valid C string.
73 let release = unsafe { CStr::from_ptr(utsname.release.as_ptr()) };
74
75 let mut components = match release.to_str().map(|r| r.split('.').map(str::parse)) {
76 Ok(c) => c,
77 Err(_) => return UringStatus::Disabled,
78 };
79
80 // Kernels older than 5.10 either didn't support io_uring or had bugs in the implementation.
81 match (components.next(), components.next()) {
82 (Some(Ok(major)), Some(Ok(minor))) if (major, minor) >= (5, 10) => {
83 // The kernel version is new enough so check if we can actually make a uring context.
84 if probe_uring().is_ok() {
85 UringStatus::Enabled(major, minor)
86 } else {
87 UringStatus::Disabled
88 }
89 }
90 _ => UringStatus::Disabled,
91 }
92 });
93 static EXT_ARG_SUPPORTED: Lazy<bool> = Lazy::new(
94 || matches!(&*URING_STATUS, UringStatus::Enabled(major, minor) if (*major, *minor) >= (5, 11)),
95 );
96
97 #[derive(Debug)]
98 enum UringStatus {
99 Enabled(usize, usize),
100 Disabled,
101 }
102
103 thread_local! (static THREAD_STATE: OnceCell<Rc<RefCell<State>>> = OnceCell::new());
new_state() -> anyhow::Result<Rc<RefCell<State>>>104 fn new_state() -> anyhow::Result<Rc<RefCell<State>>> {
105 State::new().map(RefCell::new).map(Rc::new)
106 }
107
with_state<F, R>(f: F) -> anyhow::Result<R> where F: FnOnce(&mut State) -> anyhow::Result<R>,108 fn with_state<F, R>(f: F) -> anyhow::Result<R>
109 where
110 F: FnOnce(&mut State) -> anyhow::Result<R>,
111 {
112 THREAD_STATE.with(|thread_state| {
113 let state = thread_state.get_or_try_init(new_state)?;
114 f(&mut state.borrow_mut())
115 })
116 }
117
clone_state() -> anyhow::Result<Rc<RefCell<State>>>118 fn clone_state() -> anyhow::Result<Rc<RefCell<State>>> {
119 THREAD_STATE.with(|thread_state| thread_state.get_or_try_init(new_state).map(Rc::clone))
120 }
121
122 #[derive(Debug, ThisError)]
123 enum ErrorContext {
124 #[error("`io_uring_enter` failed")]
125 EnterFailed,
126 #[error("failed to return buffer to kernel")]
127 ReturnBuffer,
128 #[error("`SubmissionQueue` full")]
129 SubmissionQueueFull,
130 }
131
probe_uring() -> anyhow::Result<()>132 fn probe_uring() -> anyhow::Result<()> {
133 const REQUIRED_OPS: &[u8] = &[
134 opcode::Accept::CODE,
135 opcode::AsyncCancel::CODE,
136 opcode::Connect::CODE,
137 opcode::Fallocate::CODE,
138 opcode::Fsync::CODE,
139 opcode::PollAdd::CODE,
140 opcode::ProvideBuffers::CODE,
141 opcode::Read::CODE,
142 opcode::Readv::CODE,
143 opcode::RecvMsg::CODE,
144 opcode::SendMsg::CODE,
145 opcode::Write::CODE,
146 opcode::Writev::CODE,
147 ];
148 let uring = IoUring::new(8)?;
149 let mut probe = Probe::new();
150 uring.submitter().register_probe(&mut probe)?;
151 if REQUIRED_OPS
152 .iter()
153 .all(|&opcode| probe.is_supported(opcode))
154 {
155 Ok(())
156 } else {
157 bail!("Not all required uring operations supported")
158 }
159 }
160
use_uring() -> bool161 pub fn use_uring() -> bool {
162 match &*URING_STATUS {
163 UringStatus::Enabled(_, _) => true,
164 UringStatus::Disabled => false,
165 }
166 }
167
168 struct State {
169 uring: IoUring,
170 waker: Waker,
171 ops: Slab<OpStatus>,
172 buffers: [LayoutAllocation; 5],
173 }
174
175 impl State {
new() -> anyhow::Result<State>176 fn new() -> anyhow::Result<State> {
177 let global_uring = GLOBAL_URING.get_or_try_init(|| IoUring::new(NUM_ENTRIES))?;
178
179 // The `setup_attach_wq` call here ensures that each thread shares the same backend in the
180 // kernel but has its own separate completion and submission queues, avoiding the need to do
181 // expensive synchronization when touching those queues in userspace.
182 let uring = Builder::default()
183 .setup_attach_wq(global_uring.as_raw_fd())
184 .build(NUM_ENTRIES)?;
185 let waker = Waker::new()?;
186 let ops = Slab::new();
187 let buffers = [
188 new_buffer_allocation(64),
189 new_buffer_allocation(128),
190 new_buffer_allocation(256),
191 new_buffer_allocation(512),
192 new_buffer_allocation(1024),
193 ];
194 let mut state = State {
195 uring,
196 waker,
197 ops,
198 buffers,
199 };
200
201 for (idx, alloc) in state.buffers.iter().enumerate() {
202 let layout = alloc.layout();
203 debug_assert_eq!(layout.size(), layout.align() * BUFFERS_PER_ALLOC as usize);
204
205 // We can't use `State::provide_buffers` directly here because `state` is already
206 // borrowed by the for loop.
207 let entry = opcode::ProvideBuffers::new(
208 alloc.as_ptr(),
209 layout.align() as i32,
210 BUFFERS_PER_ALLOC,
211 BUFFER_GROUP,
212 pack_buffer_id(idx, 0),
213 )
214 .build()
215 .user_data(idx as u64);
216
217 // Safety: The allocation is valid for `layout.align() * BUFFERS_PER_ALLOC` bytes of
218 // memory and is valid for the lifetime of the `IoUring` because it lives in the same
219 // struct.
220 unsafe { state.uring.submission().push(&entry) }
221 .context("failed to submit `ProvideBuffers` operation")?;
222 }
223
224 // Wait for all the `ProvideBuffers` operations to finish.
225 let count = state
226 .uring
227 .submit_and_wait(state.buffers.len())
228 .context(ErrorContext::EnterFailed)?;
229 debug_assert_eq!(count, state.buffers.len());
230
231 for entry in state.uring.completion() {
232 if entry.result() < 0 {
233 return Err(io::Error::from_raw_os_error(-entry.result()))
234 .context("failed to provide buffers to io_uring");
235 }
236 }
237
238 // Now start the waker that other threads can use to break us out of an `io_uring_enter`
239 // syscall.
240 state.submit_waker()?;
241
242 Ok(state)
243 }
244
getevents(&mut self) -> anyhow::Result<()>245 fn getevents(&mut self) -> anyhow::Result<()> {
246 let (submitter, squeue, _) = self.uring.split();
247 let to_submit = squeue.len();
248 let min_complete = 0;
249
250 // This flag should really be provided by the `io_uring` crate directly.
251 const IORING_ENTER_GETEVENTS: u32 = 1 << 0;
252
253 // We need to manually call `Submitter::enter` here because `submit_and_wait` will only add
254 // the `IORING_ENTER_GETEVENTS` flag when `want > 0`.
255 // Safety: the kernel will only ready `to_submit` entries from the submission queue,
256 // which have all been initialized.
257 unsafe {
258 submitter.enter::<libc::sigset_t>(
259 to_submit as u32,
260 min_complete,
261 IORING_ENTER_GETEVENTS,
262 None,
263 )
264 }
265 .map(drop)
266 .context(ErrorContext::EnterFailed)
267 }
268
submit_timer(&mut self, ts: Box<Timespec>) -> anyhow::Result<()>269 fn submit_timer(&mut self, ts: Box<Timespec>) -> anyhow::Result<()> {
270 let slot = self.ops.vacant_entry();
271 let entry = opcode::Timeout::new(&*ts)
272 .build()
273 .user_data(slot.key() as u64);
274
275 slot.insert(OpStatus::System(ts));
276
277 // Safety: the entry is valid and we can guarantee that the Timespec will live for the
278 // lifetime of the operation.
279 unsafe { self.submit_entry(&entry) }
280 }
281
wait(&mut self, timeout: Option<Duration>) -> anyhow::Result<()>282 fn wait(&mut self, timeout: Option<Duration>) -> anyhow::Result<()> {
283 if let Some(timeout) = timeout {
284 if timeout > Duration::from_secs(0) {
285 let ts = Timespec::new()
286 .sec(timeout.as_secs())
287 .nsec(timeout.subsec_nanos());
288 if *EXT_ARG_SUPPORTED {
289 let args = SubmitArgs::new().timespec(&ts);
290 self.uring
291 .submitter()
292 .submit_with_args(1, &args)
293 .map(drop)
294 .context(ErrorContext::EnterFailed)
295 } else {
296 // Since `IORING_ENTER_EXT_ARG` is not supported we need to add a `Timeout`
297 // operation and then do a regular wait.
298 self.submit_timer(Box::new(ts))?;
299 self.uring
300 .submit_and_wait(1)
301 .map(drop)
302 .context(ErrorContext::EnterFailed)
303 }
304 } else {
305 // A zero timeout means we should submit new operations and fetch any completed
306 // operations without blocking.
307 self.getevents()
308 }
309 } else {
310 self.uring
311 .submit_and_wait(1)
312 .map(drop)
313 .context(ErrorContext::EnterFailed)
314 }
315 }
316
317 // Dispatches all completed IO operations. Returns true if one of the completed operations was the
318 // thread waker.
dispatch(&mut self) -> anyhow::Result<()>319 fn dispatch(&mut self) -> anyhow::Result<()> {
320 let mut waker_entry = None;
321 let mut needs_cleanup = Vec::new();
322 for entry in self.uring.completion() {
323 let idx = entry.user_data() as usize;
324 if idx == WAKER_DATA {
325 waker_entry = Some(entry);
326 continue;
327 }
328 let status = replace(&mut self.ops[idx], OpStatus::Ready(entry));
329 match status {
330 OpStatus::New(_) => {
331 panic!("Received completion for operation that has not been started")
332 }
333 OpStatus::Waiting(w) => w.wake(),
334 OpStatus::Ready(_) => panic!("Received completion for finished operation"),
335 OpStatus::Canceled(cleanup, _) => {
336 let entry = if let OpStatus::Ready(entry) = self.ops.remove(idx) {
337 entry
338 } else {
339 panic!();
340 };
341 if let Some(c) = cleanup {
342 needs_cleanup.push((c, entry));
343 }
344 }
345 OpStatus::System(_) => drop(self.ops.remove(idx)),
346 OpStatus::Processing | OpStatus::Finished => {
347 panic!("Unexpected state for `OpStatus`")
348 }
349 }
350 }
351
352 if !needs_cleanup.is_empty() || waker_entry.is_some() {
353 // When there is a completion queue overflow, we can end up in an infinite loop:
354 // submit_entry() -> cq_overflow() -> dispatch() -> provide_buffers() / submit_waker()
355 // -> submit_entry(). Now that we've drained the completion queue, submit any pending
356 // operations in the submission queue to break the loop.
357 if self.uring.submission().cq_overflow() {
358 self.uring.submit()?;
359 }
360 }
361
362 if let Some(entry) = waker_entry {
363 // We were woken up so return the buffer to the kernel and resubmit the waker.
364 let SelectedBuffer { ptr, len, cap, bid } = self
365 .get_selected_buffer(entry)
366 .context("failed to read from waker")?;
367 debug_assert_eq!(len, size_of::<u64>());
368
369 // Safety: this was a buffer that we previously provided so we know that it is valid and
370 // lives at least as long as the `IoUring`.
371 unsafe { self.provide_buffers(ptr, cap as i32, 1, BUFFER_GROUP, bid) }
372 .context(ErrorContext::ReturnBuffer)?;
373
374 self.submit_waker()?;
375 }
376
377 for (cleanup, entry) in needs_cleanup {
378 cleanup(self, entry);
379 }
380
381 Ok(())
382 }
383
384 // Safety: This function has the same safety requirements as `SubmissionQueue::push`, namely that
385 // the parameters of `entry` are valid and will be valid for the entire duration of the operation.
submit_entry(&mut self, entry: &squeue::Entry) -> anyhow::Result<()>386 unsafe fn submit_entry(&mut self, entry: &squeue::Entry) -> anyhow::Result<()> {
387 if self.uring.submission().push(entry).is_err() {
388 if self.uring.submission().cq_overflow() {
389 self.dispatch()
390 .context("failed to dispatch completed ops during cqueue overflow")?;
391 }
392 self.uring.submit().context(ErrorContext::EnterFailed)?;
393 self.uring
394 .submission()
395 .push(entry)
396 .map_err(|_| io::Error::from_raw_os_error(libc::EBUSY))
397 .context(ErrorContext::SubmissionQueueFull)
398 } else {
399 Ok(())
400 }
401 }
402
submit_waker(&mut self) -> anyhow::Result<()>403 fn submit_waker(&mut self) -> anyhow::Result<()> {
404 let entry = opcode::Read::new(
405 Fd(self.waker.0.as_raw_fd()),
406 ptr::null_mut(),
407 size_of::<u64>() as u32,
408 )
409 .buf_group(BUFFER_GROUP)
410 .build()
411 .user_data(WAKER_DATA as u64)
412 .flags(squeue::Flags::BUFFER_SELECT);
413
414 // Safety: the entry is valid and doesn't reference any memory.
415 unsafe { self.submit_entry(&entry) }
416 }
417
418 // Safety: `buffer` must be a valid pointer to `len * nbufs` bytes of memory and must live at
419 // least as long as `self`.
provide_buffers( &mut self, buffer: *mut u8, len: i32, nbufs: u16, bgid: u16, bid: u16, ) -> anyhow::Result<()>420 unsafe fn provide_buffers(
421 &mut self,
422 buffer: *mut u8,
423 len: i32,
424 nbufs: u16,
425 bgid: u16,
426 bid: u16,
427 ) -> anyhow::Result<()> {
428 let slot = self.ops.vacant_entry();
429 let idx = slot.key();
430 let entry = opcode::ProvideBuffers::new(buffer, len, nbufs, bgid, bid)
431 .build()
432 .user_data(idx as u64);
433
434 slot.insert(OpStatus::System(Box::new(())));
435
436 // Safety: `buffer` is a valid pointer to `len * nbufs` bytes of memory and will be valid
437 // for the lifetime of the `IoUring` because it lives at least as long as `self`.
438 self.submit_entry(&entry)
439 }
440
441 // Returns the buffer selected by the kernel for `entry`. Panics if no buffer was selected by
442 // the kernel.
get_selected_buffer(&self, entry: cqueue::Entry) -> anyhow::Result<SelectedBuffer>443 fn get_selected_buffer(&self, entry: cqueue::Entry) -> anyhow::Result<SelectedBuffer> {
444 let len = entry_to_result(entry.clone())?;
445
446 let bid = buffer_select(entry.flags()).expect("No buffer selected");
447 let (alloc_idx, buffer_idx) = unpack_buffer_id(bid);
448 let alloc = &self.buffers[alloc_idx];
449 let layout = alloc.layout();
450 let cap = layout.align();
451
452 debug_assert!(len <= cap);
453 debug_assert!(buffer_idx * layout.align() <= layout.size() - len);
454
455 // Safety: the allocation is valid for at least `buffer_idx * layout.align()` bytes of
456 // memory.
457 let ptr = unsafe { alloc.as_ptr::<u8>().add(buffer_idx * layout.align()) };
458 Ok(SelectedBuffer { ptr, len, cap, bid })
459 }
460
461 // Copies data from the kernel-selected buffer into the user-provided buffer and returns the
462 // selected buffer to the kernel. Panics if no buffer was selected by the kernel.
copy_from_selected_buffer( &mut self, entry: cqueue::Entry, buf: &mut [u8], ) -> anyhow::Result<usize>463 fn copy_from_selected_buffer(
464 &mut self,
465 entry: cqueue::Entry,
466 buf: &mut [u8],
467 ) -> anyhow::Result<usize> {
468 let SelectedBuffer { ptr, len, cap, bid } = self.get_selected_buffer(entry)?;
469 let count = min(len, buf.len());
470
471 // Safety: both pointers point to at least `count` bytes of allocated memory.
472 unsafe { ptr::copy_nonoverlapping(ptr, buf.as_mut_ptr(), count) };
473
474 // Now that we've copied the data out we need to return the buffer to the kernel.
475 // Safety: this is a buffer that was previously registered with the kernel and the caller
476 // that registered it was required to guarantee that it lives as long as the `IoUring`.
477 // We're reusing that guarantee here.
478 unsafe { self.provide_buffers(ptr, cap as i32, 1, BUFFER_GROUP, bid) }
479 .context(ErrorContext::ReturnBuffer)?;
480
481 Ok(count)
482 }
483
cancel_op(&mut self, idx: usize) -> anyhow::Result<()>484 fn cancel_op(&mut self, idx: usize) -> anyhow::Result<()> {
485 // We're still waiting for the underlying IO to complete so try to cancel it if we can.
486 let slot = self.ops.vacant_entry();
487 let cancel = opcode::AsyncCancel::new(idx as u64)
488 .build()
489 .user_data(slot.key() as u64);
490
491 slot.insert(OpStatus::System(Box::new(())));
492
493 // Safety: The entry is valid and doesn't reference any memory.
494 unsafe { self.submit_entry(&cancel) }.context("failed to submit async cancellation")
495 }
496
497 // TODO: Do we actually need any of this? Once the IoUring is dropped, the fd should be closed
498 // so it doesn't seem necessary for us to actually drain it. It would be weird if the kernel
499 // kept around references to memory once the uring fd is gone.
500 // fn shutdown(&mut self, deadline: Instant) -> anyhow::Result<()> {
501 // // Every async operation owns a reference to the `State` and either removes itself from
502 // // `self.ops` or changes its status to `Canceled` when it is dropped so `self.ops` shouldn't
503 // // contain anything other than canceled and system operations.
504 // let pending = self
505 // .ops
506 // .iter_mut()
507 // .filter_map(|(idx, op)| match replace(op, OpStatus::Processing) {
508 // OpStatus::System(data) => {
509 // *op = OpStatus::Canceled(data);
510 // Some(idx)
511 // }
512 // OpStatus::Canceled(data) => {
513 // *op = OpStatus::Canceled(data);
514 // None
515 // }
516 // _ => panic!(
517 // "Thread state dropped while there are still non-canceled operations pending"
518 // ),
519 // })
520 // .collect::<Vec<_>>();
521
522 // for idx in pending {
523 // self.cancel_op(idx)?;
524 // }
525
526 // // Wait for all the canceled operations to finish.
527 // if !self.ops.is_empty() {
528 // self.wait(
529 // self.ops.len(),
530 // Some(deadline.saturating_duration_since(Instant::now())),
531 // )?;
532 // }
533 // self.dispatch()?;
534
535 // let ext_arg_supported = *EXT_ARG_SUPPORTED;
536 // // When `IORING_ENTER_EXT_ARG` is not supported, there may still be a timer op left in
537 // // `self.ops`.
538 // if (ext_arg_supported && !self.ops.is_empty()) || (!ext_arg_supported && self.ops.len() > 1)
539 // {
540 // return Err(anyhow!(io::Error::from_raw_os_error(libc::ETIMEDOUT))).context(format!(
541 // "Still waiting for {} operations to finish",
542 // self.ops.len()
543 // ));
544 // }
545
546 // // The `Waker` is the last pending operation.
547 // self.waker.wake().context("failed to wake Waker")?;
548 // self.wait(1, Some(deadline.saturating_duration_since(Instant::now())))?;
549
550 // Ok(())
551 // }
552 }
553
554 // TODO: Do we actually need this? See State::shutdown above.
555 // impl Drop for State {
556 // fn drop(&mut self) {
557 // // How long we should wait to drain the `IoUring` before giving up.
558 // const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(10);
559 // if let Err(e) = self.shutdown(Instant::now() + SHUTDOWN_TIMEOUT) {
560 // process::abort();
561 // }
562 // }
563 // }
564
565 struct SelectedBuffer {
566 ptr: *mut u8,
567 len: usize,
568 cap: usize,
569 bid: u16,
570 }
571
new_buffer_allocation(size: usize) -> LayoutAllocation572 fn new_buffer_allocation(size: usize) -> LayoutAllocation {
573 let layout = Layout::from_size_align(size * usize::from(BUFFERS_PER_ALLOC), size)
574 .expect("Invalid layout");
575 LayoutAllocation::uninitialized(layout)
576 }
577
pack_buffer_id(alloc_idx: usize, buffer_idx: usize) -> u16578 fn pack_buffer_id(alloc_idx: usize, buffer_idx: usize) -> u16 {
579 debug_assert!(alloc_idx << ALLOC_IDX_SHIFT <= u16::MAX as usize);
580 debug_assert_eq!(buffer_idx & usize::from(BUFFER_IDX_MASK), buffer_idx);
581 ((alloc_idx << ALLOC_IDX_SHIFT) | buffer_idx) as u16
582 }
583
584 // Returns the index of the `LayoutAllocation` and the index of the buffer within that allocation.
unpack_buffer_id(bid: u16) -> (usize, usize)585 fn unpack_buffer_id(bid: u16) -> (usize, usize) {
586 let alloc_idx = (bid >> ALLOC_IDX_SHIFT).into();
587 let buffer_idx = (bid & BUFFER_IDX_MASK).into();
588 (alloc_idx, buffer_idx)
589 }
590
591 pub struct Waker(EventFd);
592 impl Waker {
new() -> anyhow::Result<Waker>593 fn new() -> anyhow::Result<Waker> {
594 EventFd::new()
595 .map(Waker)
596 .map_err(|e| anyhow!(io::Error::from(e)))
597 }
598
try_clone(&self) -> anyhow::Result<Waker>599 fn try_clone(&self) -> anyhow::Result<Waker> {
600 self.0
601 .try_clone()
602 .map(Waker)
603 .map_err(|e| anyhow!(io::Error::from(e)))
604 }
605
wake(&self) -> anyhow::Result<()>606 pub fn wake(&self) -> anyhow::Result<()> {
607 self.0
608 .write(1)
609 .map(drop)
610 .map_err(|e| anyhow!(io::Error::from(e)))
611 }
612 }
613
new_waker() -> anyhow::Result<Waker>614 pub fn new_waker() -> anyhow::Result<Waker> {
615 with_state(|state| state.waker.try_clone())
616 }
617
618 // Wait for more events.
wait(timeout: Option<Duration>) -> anyhow::Result<()>619 pub fn wait(timeout: Option<Duration>) -> anyhow::Result<()> {
620 with_state(|state| state.wait(timeout))
621 }
622
623 // Wake up any tasks that are ready.
dispatch() -> anyhow::Result<()>624 pub fn dispatch() -> anyhow::Result<()> {
625 with_state(|state| state.dispatch())
626 }
627
628 enum OpStatus {
629 New(squeue::Entry),
630 Waiting(task::Waker),
631 Ready(cqueue::Entry),
632 Canceled(Option<fn(&mut State, cqueue::Entry)>, Box<dyn Any>),
633 System(Box<dyn Any>),
634 Processing,
635 Finished,
636 }
637
638 struct Op<'a, B: Unpin + 'static> {
639 state: Rc<RefCell<State>>,
640 desc: &'a Arc<SafeDescriptor>,
641 cleanup: Option<fn(&mut State, cqueue::Entry)>,
642 buf: Option<B>,
643 idx: usize,
644 }
645
646 impl<'a, B: Unpin + 'static> Future for Op<'a, B> {
647 type Output = (anyhow::Result<cqueue::Entry>, Option<B>);
648
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>649 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
650 let mut state = self.state.borrow_mut();
651 let status = replace(&mut state.ops[self.idx], OpStatus::Processing);
652
653 match status {
654 // We don't want to submit the operation to the kernel until this future is polled
655 // because polling requires pinning and pinning guarantees that our drop impl will be
656 // called, which is necessary to ensure that resources shared with the kernel will live
657 // for the lifetime of the operation.
658 OpStatus::New(entry) => {
659 // Safety: The parameters in `Entry` are owned by `Op` and will be transferred to
660 // the thread state if this `Op` is dropped, guaranteeing that they are valid for
661 // the lifetime of the operation. Also see above for the drop guarantee.
662 let res = unsafe { state.submit_entry(&entry) };
663
664 if let Err(e) = res {
665 drop(state);
666 return Poll::Ready((Err(e), self.buf.take()));
667 }
668
669 state.ops[self.idx] = OpStatus::Waiting(cx.waker().clone());
670 }
671 OpStatus::Waiting(w) if !w.will_wake(cx.waker()) => {
672 state.ops[self.idx] = OpStatus::Waiting(cx.waker().clone())
673 }
674 // If `cx.waker()` and the currently stored waker are the same then no need to do
675 // anything.
676 OpStatus::Waiting(w) => state.ops[self.idx] = OpStatus::Waiting(w),
677 OpStatus::Ready(entry) => {
678 state.ops[self.idx] = OpStatus::Finished;
679 drop(state);
680
681 let buf = self.buf.take();
682 return Poll::Ready((Ok(entry), buf));
683 }
684 OpStatus::Canceled(_, _) => panic!("`Op` polled after drop"),
685 OpStatus::System(_) | OpStatus::Processing => panic!("Unexpected state for `OpStatus`"),
686 OpStatus::Finished => panic!("`Op` polled after returning `Poll::Ready`"),
687 }
688
689 Poll::Pending
690 }
691 }
692
693 impl<'a, B: Unpin + 'static> Drop for Op<'a, B> {
drop(&mut self)694 fn drop(&mut self) {
695 let mut state = self.state.borrow_mut();
696 let status = replace(&mut state.ops[self.idx], OpStatus::Processing);
697
698 if let OpStatus::Waiting(_) = status {
699 // If we're still waiting for the IO to finish then we cannot free the resources until
700 // the operation is complete.
701 if let Err(e) = state.cancel_op(self.idx) {
702 warn!("Failed to cancel dropped operation: {:#}", e);
703 }
704
705 // Now take ownership of any resources associated with the canceled operation.
706 state.ops[self.idx] = OpStatus::Canceled(
707 self.cleanup.take(),
708 Box::new((self.desc.clone(), self.buf.take())),
709 )
710 } else {
711 // We have not shared any resources with the kernel so we can clean up the `OpStatus` now.
712 state.ops.remove(self.idx);
713 }
714 }
715 }
716
start_op<B: Unpin + 'static>( state: Rc<RefCell<State>>, entry: squeue::Entry, desc: &Arc<SafeDescriptor>, cleanup: Option<fn(&mut State, cqueue::Entry)>, buf: Option<B>, ) -> Op<B>717 fn start_op<B: Unpin + 'static>(
718 state: Rc<RefCell<State>>,
719 entry: squeue::Entry,
720 desc: &Arc<SafeDescriptor>,
721 cleanup: Option<fn(&mut State, cqueue::Entry)>,
722 buf: Option<B>,
723 ) -> Op<B> {
724 let idx = {
725 let mut state = state.borrow_mut();
726 let slot = state.ops.vacant_entry();
727 let idx = slot.key();
728 slot.insert(OpStatus::New(entry.user_data(idx as u64)));
729 idx
730 };
731 Op {
732 state,
733 desc,
734 cleanup,
735 buf,
736 idx,
737 }
738 }
739
entry_to_result(entry: cqueue::Entry) -> anyhow::Result<usize>740 fn entry_to_result(entry: cqueue::Entry) -> anyhow::Result<usize> {
741 let res = entry.result();
742 if res < 0 {
743 Err(anyhow!(io::Error::from_raw_os_error(-res)))
744 } else {
745 Ok(res as usize)
746 }
747 }
748
return_selected_buffer(state: &mut State, entry: cqueue::Entry)749 fn return_selected_buffer(state: &mut State, entry: cqueue::Entry) {
750 let inner = || {
751 let SelectedBuffer {
752 ptr,
753 len: _,
754 cap,
755 bid,
756 } = state.get_selected_buffer(entry)?;
757
758 // Safety: we are returning a buffer that was previously provided to the kernel so we know
759 // it must live as long as the `IoUring`.
760 unsafe { state.provide_buffers(ptr, cap as i32, 1, BUFFER_GROUP, bid) }
761 };
762
763 if let Err(e) = inner() {
764 warn!(
765 "Failed to return selected buffer to kernel; buffer will be leaked: {:#}",
766 e
767 );
768 }
769 }
770
read( desc: &Arc<SafeDescriptor>, buf: &mut [u8], offset: Option<u64>, ) -> anyhow::Result<usize>771 pub async fn read(
772 desc: &Arc<SafeDescriptor>,
773 buf: &mut [u8],
774 offset: Option<u64>,
775 ) -> anyhow::Result<usize> {
776 let len = buf
777 .len()
778 .try_into()
779 .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
780 let mut read =
781 opcode::Read::new(Fd(desc.as_raw_fd()), ptr::null_mut(), len).buf_group(BUFFER_GROUP);
782 if let Some(offset) = offset {
783 read = read.offset(offset as libc::off64_t);
784 }
785 let entry = read.build().flags(squeue::Flags::BUFFER_SELECT);
786 let state = clone_state()?;
787 let (res, _) = start_op(state, entry, desc, Some(return_selected_buffer), None::<()>).await;
788 with_state(|state| state.copy_from_selected_buffer(res?, buf))
789 }
790
read_iobuf<B: AsIoBufs + Unpin + 'static>( desc: &Arc<SafeDescriptor>, mut buf: B, offset: Option<u64>, ) -> (anyhow::Result<usize>, B)791 pub async fn read_iobuf<B: AsIoBufs + Unpin + 'static>(
792 desc: &Arc<SafeDescriptor>,
793 mut buf: B,
794 offset: Option<u64>,
795 ) -> (anyhow::Result<usize>, B) {
796 let iobufs = IoBufMut::as_iobufs(buf.as_iobufs());
797 let mut readv = opcode::Readv::new(Fd(desc.as_raw_fd()), iobufs.as_ptr(), iobufs.len() as u32);
798 if let Some(off) = offset {
799 readv = readv.offset(off as libc::off64_t);
800 }
801
802 let state = match clone_state() {
803 Ok(s) => s,
804 Err(e) => return (Err(e), buf),
805 };
806 let (res, buf) = start_op(state, readv.build(), desc, None, Some(buf)).await;
807 (res.and_then(entry_to_result), buf.unwrap())
808 }
809
write( desc: &Arc<SafeDescriptor>, buf: &[u8], offset: Option<u64>, ) -> anyhow::Result<usize>810 pub async fn write(
811 desc: &Arc<SafeDescriptor>,
812 buf: &[u8],
813 offset: Option<u64>,
814 ) -> anyhow::Result<usize> {
815 // TODO: Maybe we should do something smarter here with a shared buffer pool like we do for
816 // `read`.
817 let (res, _) = write_iobuf(desc, OwnedIoBuf::new(buf.to_vec()), offset).await;
818 res
819 }
820
write_iobuf<B: AsIoBufs + Unpin + 'static>( desc: &Arc<SafeDescriptor>, mut buf: B, offset: Option<u64>, ) -> (anyhow::Result<usize>, B)821 pub async fn write_iobuf<B: AsIoBufs + Unpin + 'static>(
822 desc: &Arc<SafeDescriptor>,
823 mut buf: B,
824 offset: Option<u64>,
825 ) -> (anyhow::Result<usize>, B) {
826 let iobufs = IoBufMut::as_iobufs(buf.as_iobufs());
827 let mut writev =
828 opcode::Writev::new(Fd(desc.as_raw_fd()), iobufs.as_ptr(), iobufs.len() as u32);
829 if let Some(off) = offset {
830 writev = writev.offset(off as libc::off64_t);
831 }
832
833 let state = match clone_state() {
834 Ok(s) => s,
835 Err(e) => return (Err(e), buf),
836 };
837 let (res, buf) = start_op(state, writev.build(), desc, None, Some(buf)).await;
838 (res.and_then(entry_to_result), buf.unwrap())
839 }
840
fallocate( desc: &Arc<SafeDescriptor>, file_offset: u64, len: u64, mode: u32, ) -> anyhow::Result<()>841 pub async fn fallocate(
842 desc: &Arc<SafeDescriptor>,
843 file_offset: u64,
844 len: u64,
845 mode: u32,
846 ) -> anyhow::Result<()> {
847 let entry = opcode::Fallocate::new(Fd(desc.as_raw_fd()), len as libc::off64_t)
848 .offset(file_offset as libc::off64_t)
849 .mode(mode as libc::c_int)
850 .build();
851 let state = clone_state()?;
852 let (res, _) = start_op(state, entry, desc, None, None::<()>).await;
853 res.and_then(entry_to_result).map(drop)
854 }
855
ftruncate(desc: &Arc<SafeDescriptor>, len: u64) -> anyhow::Result<()>856 pub async fn ftruncate(desc: &Arc<SafeDescriptor>, len: u64) -> anyhow::Result<()> {
857 let ret = unsafe { libc::ftruncate64(desc.as_raw_descriptor(), len as libc::off64_t) };
858
859 if ret == 0 {
860 Ok(())
861 } else {
862 Err(io::Error::last_os_error().into())
863 }
864 }
865
stat(desc: &Arc<SafeDescriptor>) -> anyhow::Result<libc::stat64>866 pub async fn stat(desc: &Arc<SafeDescriptor>) -> anyhow::Result<libc::stat64> {
867 // TODO: use opcode::Statx
868 let mut st = MaybeUninit::zeroed();
869
870 // Safe because this will only modify `st` and we check the return value.
871 let ret = unsafe { libc::fstat64(desc.as_raw_descriptor(), st.as_mut_ptr()) };
872
873 if ret == 0 {
874 // Safe because the kernel guarantees that `st` is now initialized.
875 Ok(unsafe { st.assume_init() })
876 } else {
877 Err(io::Error::last_os_error().into())
878 }
879 }
880
fsync(desc: &Arc<SafeDescriptor>, datasync: bool) -> anyhow::Result<()>881 pub async fn fsync(desc: &Arc<SafeDescriptor>, datasync: bool) -> anyhow::Result<()> {
882 let mut entry = opcode::Fsync::new(Fd(desc.as_raw_fd()));
883 if datasync {
884 entry = entry.flags(FsyncFlags::DATASYNC);
885 }
886 let state = clone_state()?;
887 let (res, _) = start_op(state, entry.build(), desc, None, None::<()>).await;
888 res.and_then(entry_to_result).map(drop)
889 }
890
connect( desc: &Arc<SafeDescriptor>, addr: libc::sockaddr_un, len: libc::socklen_t, ) -> anyhow::Result<()>891 pub async fn connect(
892 desc: &Arc<SafeDescriptor>,
893 addr: libc::sockaddr_un,
894 len: libc::socklen_t,
895 ) -> anyhow::Result<()> {
896 ensure!(
897 len <= size_of::<libc::sockaddr_un>() as libc::socklen_t,
898 io::Error::from_raw_os_error(libc::EINVAL)
899 );
900 // TODO: Figure out a way to get rid of this box.
901 let addr = Box::new(addr);
902
903 let entry = opcode::Connect::new(
904 Fd(desc.as_raw_fd()),
905 &*addr as *const libc::sockaddr_un as *const _,
906 len,
907 )
908 .build();
909 let state = clone_state()?;
910 let (res, _) = start_op(state, entry, desc, None, Some(addr)).await;
911
912 res.and_then(entry_to_result).map(drop)
913 }
914
next_packet_size(desc: &Arc<SafeDescriptor>) -> anyhow::Result<usize>915 pub async fn next_packet_size(desc: &Arc<SafeDescriptor>) -> anyhow::Result<usize> {
916 // For some reason, this always returns 0 under uring so use epoll-style for now. TODO: Figure
917 // out how we can go back to using uring.
918 #[cfg(not(debug_assertions))]
919 let buf = ptr::null_mut();
920 // Work around for qemu's syscall translation which will reject null pointers in recvfrom.
921 // This only matters for running the unit tests for a non-native architecture. See the
922 // upstream thread for the qemu fix:
923 // https://lists.nongnu.org/archive/html/qemu-devel/2021-03/msg09027.html
924 #[cfg(debug_assertions)]
925 let buf = ptr::NonNull::dangling().as_ptr();
926
927 loop {
928 // Safe because this will not modify any memory and we check the return value.
929 let ret = unsafe {
930 libc::recvfrom(
931 desc.as_raw_descriptor(),
932 buf,
933 0,
934 libc::MSG_TRUNC | libc::MSG_PEEK | libc::MSG_DONTWAIT,
935 ptr::null_mut(),
936 ptr::null_mut(),
937 )
938 };
939
940 if ret >= 0 {
941 return Ok(ret as usize);
942 }
943
944 match sys_util::Error::last() {
945 e if e.errno() == libc::EWOULDBLOCK || e.errno() == libc::EAGAIN => {
946 wait_readable(desc).await?;
947 }
948 e => bail!(io::Error::from(e)),
949 }
950 }
951 }
952
sendmsg( desc: &Arc<SafeDescriptor>, buf: &[u8], fds: &[RawFd], ) -> anyhow::Result<usize>953 pub async fn sendmsg(
954 desc: &Arc<SafeDescriptor>,
955 buf: &[u8],
956 fds: &[RawFd],
957 ) -> anyhow::Result<usize> {
958 // TODO: Consider using a shared buffer pool.
959 let (res, _) = send_iobuf_with_fds(desc, OwnedIoBuf::new(buf.to_vec()), fds).await;
960 res
961 }
962
recvmsg( desc: &Arc<SafeDescriptor>, buf: &mut [u8], fds: &mut [RawFd], ) -> anyhow::Result<(usize, usize)>963 pub async fn recvmsg(
964 desc: &Arc<SafeDescriptor>,
965 buf: &mut [u8],
966 fds: &mut [RawFd],
967 ) -> anyhow::Result<(usize, usize)> {
968 // TODO: The io_uring crate doesn't support using BUFFER_SELECT for recvmsg even though it's
969 // supported by the kernel.
970 let (res, src) = recv_iobuf_with_fds(desc, OwnedIoBuf::new(vec![0u8; buf.len()]), fds).await;
971 let (buflen, fd_count) = res?;
972 let count = min(buflen, buf.len());
973 buf[..count].copy_from_slice(&src[..count]);
974 Ok((count, fd_count))
975 }
976
send_iobuf_with_fds<B: AsIoBufs + Unpin + 'static>( desc: &Arc<SafeDescriptor>, mut buf: B, fds: &[RawFd], ) -> (anyhow::Result<usize>, B)977 pub async fn send_iobuf_with_fds<B: AsIoBufs + Unpin + 'static>(
978 desc: &Arc<SafeDescriptor>,
979 mut buf: B,
980 fds: &[RawFd],
981 ) -> (anyhow::Result<usize>, B) {
982 let iovs = IoBufMut::as_iobufs(buf.as_iobufs());
983 let mut msg = libc::msghdr {
984 msg_name: ptr::null_mut(),
985 msg_namelen: 0,
986 msg_iov: iovs.as_ptr() as *mut libc::iovec,
987 msg_iovlen: iovs.len(),
988 msg_flags: libc::MSG_NOSIGNAL,
989 msg_control: ptr::null_mut(),
990 msg_controllen: 0,
991 };
992
993 // `IORING_OP_SENDMSG` internally uses the `__sys_sendmsg_sock` kernel function, which disallows
994 // control messages. In that case we fall back to epoll-style async operations.
995 if !fds.is_empty() {
996 let inner = async {
997 let cmsg_buffer = add_fds_to_message(&mut msg, fds)?;
998
999 loop {
1000 // Safe because this doesn't modify any memory and we check the return value.
1001 let ret = unsafe {
1002 libc::sendmsg(
1003 desc.as_raw_descriptor(),
1004 &msg,
1005 libc::MSG_NOSIGNAL | libc::MSG_DONTWAIT,
1006 )
1007 };
1008
1009 if ret >= 0 {
1010 return Ok(ret as usize);
1011 }
1012
1013 match sys_util::Error::last() {
1014 e if e.errno() == libc::EWOULDBLOCK || e.errno() == libc::EAGAIN => {
1015 wait_writable(desc).await?;
1016 }
1017 e => return Err(anyhow!(io::Error::from(e))),
1018 }
1019 }
1020 };
1021 (inner.await, buf)
1022 } else {
1023 let msg = Box::new(msg);
1024 let entry = opcode::SendMsg::new(Fd(desc.as_raw_descriptor()), &*msg).build();
1025 let state = match clone_state() {
1026 Ok(s) => s,
1027 Err(e) => return (Err(e), buf),
1028 };
1029 let (res, data) = start_op(state, entry, desc, None, Some((buf, msg))).await;
1030 (res.and_then(entry_to_result), data.unwrap().0)
1031 }
1032 }
1033
recv_iobuf_with_fds<B: AsIoBufs + Unpin + 'static>( desc: &Arc<SafeDescriptor>, mut buf: B, fds: &mut [RawFd], ) -> (anyhow::Result<(usize, usize)>, B)1034 pub async fn recv_iobuf_with_fds<B: AsIoBufs + Unpin + 'static>(
1035 desc: &Arc<SafeDescriptor>,
1036 mut buf: B,
1037 fds: &mut [RawFd],
1038 ) -> (anyhow::Result<(usize, usize)>, B) {
1039 let iovs = IoBufMut::as_iobufs(buf.as_iobufs());
1040 // `IORING_OP_RECVMSG` internally uses the `__sys_recvmsg_sock` kernel function, which disallows
1041 // control messages. In that case we fall back to epoll-style async operations.
1042 if !fds.is_empty() {
1043 let inner = async {
1044 let fd_cap = fds
1045 .len()
1046 .checked_mul(size_of::<RawFd>())
1047 .and_then(|l| u32::try_from(l).ok())
1048 .ok_or_else(|| io::Error::from(io::ErrorKind::InvalidInput))?;
1049 let (cmsg_buffer, cmsg_cap) = allocate_cmsg_buffer(fd_cap)?;
1050 let mut msg = libc::msghdr {
1051 msg_name: ptr::null_mut(),
1052 msg_namelen: 0,
1053 msg_iov: iovs.as_ptr() as *mut libc::iovec,
1054 msg_iovlen: iovs.len(),
1055 msg_flags: 0,
1056 msg_control: cmsg_buffer.as_ptr(),
1057 msg_controllen: cmsg_cap,
1058 };
1059
1060 let buflen = loop {
1061 // Safe because this will only modify `buf` and `cmsg_buffer` and we check the return value.
1062 let ret = unsafe {
1063 libc::recvmsg(
1064 desc.as_raw_descriptor(),
1065 &mut msg,
1066 libc::MSG_NOSIGNAL | libc::MSG_DONTWAIT,
1067 )
1068 };
1069
1070 if ret >= 0 {
1071 break ret as usize;
1072 }
1073
1074 match sys_util::Error::last() {
1075 e if e.errno() == libc::EWOULDBLOCK || e.errno() == libc::EAGAIN => {
1076 wait_readable(desc).await?;
1077 }
1078 e => return Err(anyhow!(io::Error::from(e))),
1079 }
1080 };
1081
1082 let fd_count = take_fds_from_message(&msg, fds)?;
1083 Ok((buflen, fd_count))
1084 };
1085 (inner.await, buf)
1086 } else {
1087 let mut msg = Box::new(libc::msghdr {
1088 msg_name: ptr::null_mut(),
1089 msg_namelen: 0,
1090 msg_iov: iovs.as_ptr() as *mut libc::iovec,
1091 msg_iovlen: iovs.len(),
1092 msg_flags: libc::MSG_NOSIGNAL,
1093 msg_control: ptr::null_mut(),
1094 msg_controllen: 0,
1095 });
1096
1097 let entry = opcode::RecvMsg::new(Fd(desc.as_raw_descriptor()), &mut *msg).build();
1098 let state = match clone_state() {
1099 Ok(s) => s,
1100 Err(e) => return (Err(e), buf),
1101 };
1102 let (res, data) = start_op(state, entry, desc, None, Some((buf, msg))).await;
1103 let (buf, msg) = data.unwrap();
1104
1105 let inner = || {
1106 let buflen = res.and_then(entry_to_result)?;
1107 let fd_count = take_fds_from_message(&msg, fds)?;
1108 Ok((buflen, fd_count))
1109 };
1110 (inner(), buf)
1111 }
1112 }
1113
accept(desc: &Arc<SafeDescriptor>) -> anyhow::Result<SafeDescriptor>1114 pub async fn accept(desc: &Arc<SafeDescriptor>) -> anyhow::Result<SafeDescriptor> {
1115 let entry = opcode::Accept::new(Fd(desc.as_raw_fd()), ptr::null_mut(), ptr::null_mut())
1116 .flags(libc::SOCK_CLOEXEC)
1117 .build();
1118 let state = clone_state()?;
1119 let (res, _) = start_op(state, entry, desc, None, None::<()>).await;
1120
1121 // Safe because we own this fd.
1122 res.and_then(entry_to_result)
1123 .map(|fd| unsafe { SafeDescriptor::from_raw_descriptor(fd as _) })
1124 }
1125
wait_readable(desc: &Arc<SafeDescriptor>) -> anyhow::Result<()>1126 pub async fn wait_readable(desc: &Arc<SafeDescriptor>) -> anyhow::Result<()> {
1127 let entry = opcode::PollAdd::new(Fd(desc.as_raw_fd()), libc::POLLIN as u32).build();
1128 let state = clone_state()?;
1129 let (res, _) = start_op(state, entry, desc, None, None::<()>).await;
1130 res.and_then(entry_to_result).map(drop)
1131 }
1132
wait_writable(desc: &Arc<SafeDescriptor>) -> anyhow::Result<()>1133 pub async fn wait_writable(desc: &Arc<SafeDescriptor>) -> anyhow::Result<()> {
1134 let entry = opcode::PollAdd::new(Fd(desc.as_raw_fd()), libc::POLLOUT as u32).build();
1135 let state = clone_state()?;
1136 let (res, _) = start_op(state, entry, desc, None, None::<()>).await;
1137 res.and_then(entry_to_result).map(drop)
1138 }
1139
prepare(_fd: &dyn AsRawDescriptor) -> anyhow::Result<()>1140 pub fn prepare(_fd: &dyn AsRawDescriptor) -> anyhow::Result<()> {
1141 Ok(())
1142 }
1143