1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 // This file makes several casts from u8 pointers into more-aligned pointer types.
6 // We assume that the kernel will give us suitably aligned memory.
7 #![allow(clippy::cast_ptr_alignment)]
8
9 use std::collections::BTreeMap;
10 use std::fs::File;
11 use std::io;
12 use std::os::unix::io::AsRawFd;
13 use std::os::unix::io::FromRawFd;
14 use std::os::unix::io::RawFd;
15 use std::pin::Pin;
16 use std::ptr::null;
17 use std::sync::atomic::AtomicPtr;
18 use std::sync::atomic::AtomicU32;
19 use std::sync::atomic::AtomicU64;
20 use std::sync::atomic::AtomicUsize;
21 use std::sync::atomic::Ordering;
22
23 use base::AsRawDescriptor;
24 use base::EventType;
25 use base::MappedRegion;
26 use base::MemoryMapping;
27 use base::MemoryMappingBuilder;
28 use base::Protection;
29 use base::RawDescriptor;
30 use data_model::IoBufMut;
31 use libc::c_void;
32 use remain::sorted;
33 use sync::Mutex;
34 use thiserror::Error as ThisError;
35
36 use crate::bindings::*;
37 use crate::syscalls::*;
38
39 /// Holds per-operation, user specified data. The usage is up to the caller. The most common use is
40 /// for callers to identify each request.
41 pub type UserData = u64;
42
43 #[sorted]
44 #[derive(Debug, ThisError)]
45 pub enum Error {
46 /// Failed to map the completion ring.
47 #[error("Failed to mmap completion ring {0}")]
48 MappingCompleteRing(base::MmapError),
49 /// Failed to map submit entries.
50 #[error("Failed to mmap submit entries {0}")]
51 MappingSubmitEntries(base::MmapError),
52 /// Failed to map the submit ring.
53 #[error("Failed to mmap submit ring {0}")]
54 MappingSubmitRing(base::MmapError),
55 /// Too many ops are already queued.
56 #[error("No space for more ring entries, try increasing the size passed to `new`")]
57 NoSpace,
58 /// The call to `io_uring_enter` failed with the given errno.
59 #[error("Failed to enter io uring: {0}")]
60 RingEnter(libc::c_int),
61 /// The call to `io_uring_register` failed with the given errno.
62 #[error("Failed to register operations for io uring: {0}")]
63 RingRegister(libc::c_int),
64 /// The call to `io_uring_setup` failed with the given errno.
65 #[error("Failed to setup io uring {0}")]
66 Setup(libc::c_int),
67 }
68 pub type Result<T> = std::result::Result<T, Error>;
69
70 impl From<Error> for io::Error {
from(e: Error) -> Self71 fn from(e: Error) -> Self {
72 use Error::*;
73 match e {
74 RingEnter(errno) => io::Error::from_raw_os_error(errno),
75 Setup(errno) => io::Error::from_raw_os_error(errno),
76 e => io::Error::new(io::ErrorKind::Other, e),
77 }
78 }
79 }
80
81 /// Basic statistics about the operations that have been submitted to the uring.
82 #[derive(Default)]
83 pub struct URingStats {
84 total_enter_calls: AtomicU64, // Number of times the uring has been entered.
85 pub total_ops: AtomicU64, // Total ops submitted to io_uring.
86 total_complete: AtomicU64, // Total ops completed by io_uring.
87 }
88
89 pub struct SubmitQueue {
90 submit_ring: SubmitQueueState,
91 submit_queue_entries: SubmitQueueEntries,
92 io_vecs: Pin<Box<[IoBufMut<'static>]>>,
93 submitting: usize, // The number of ops in the process of being submitted.
94 pub added: usize, // The number of ops added since the last call to `io_uring_enter`.
95 num_sqes: usize, // The total number of sqes allocated in shared memory.
96 }
97
98 // Helper functions to set io_uring_sqe bindgen union members in a less verbose manner.
99 impl io_uring_sqe {
set_addr(&mut self, val: u64)100 pub fn set_addr(&mut self, val: u64) {
101 self.__bindgen_anon_2.addr = val;
102 }
set_off(&mut self, val: u64)103 pub fn set_off(&mut self, val: u64) {
104 self.__bindgen_anon_1.off = val;
105 }
106
set_buf_index(&mut self, val: u16)107 pub fn set_buf_index(&mut self, val: u16) {
108 self.__bindgen_anon_4.buf_index = val;
109 }
110
set_rw_flags(&mut self, val: libc::c_int)111 pub fn set_rw_flags(&mut self, val: libc::c_int) {
112 self.__bindgen_anon_3.rw_flags = val;
113 }
114
set_poll_events(&mut self, val: u32)115 pub fn set_poll_events(&mut self, val: u32) {
116 let val = if cfg!(target_endian = "big") {
117 // Swap words on big-endian platforms to match the original ABI where poll_events was 16
118 // bits wide.
119 val.rotate_left(16)
120 } else {
121 val
122 };
123 self.__bindgen_anon_3.poll32_events = val;
124 }
125 }
126
127 // Convert a file offset to the raw io_uring offset format.
128 // Some => explicit offset
129 // None => use current file position
file_offset_to_raw_offset(offset: Option<u64>) -> u64130 fn file_offset_to_raw_offset(offset: Option<u64>) -> u64 {
131 // File offsets are interpretted as off64_t inside io_uring, with -1 representing the current
132 // file position.
133 const USE_CURRENT_FILE_POS: libc::off64_t = -1;
134 offset.unwrap_or(USE_CURRENT_FILE_POS as u64)
135 }
136
137 impl SubmitQueue {
138 // Call `f` with the next available sqe or return an error if none are available.
139 // After `f` returns, the sqe is appended to the kernel's queue.
prep_next_sqe<F>(&mut self, mut f: F) -> Result<()> where F: FnMut(&mut io_uring_sqe, &mut libc::iovec),140 fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
141 where
142 F: FnMut(&mut io_uring_sqe, &mut libc::iovec),
143 {
144 if self.added == self.num_sqes {
145 return Err(Error::NoSpace);
146 }
147
148 // Find the next free submission entry in the submit ring and fill it with an iovec.
149 // The below raw pointer derefs are safe because the memory the pointers use lives as long
150 // as the mmap in self.
151 let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
152 let next_tail = tail.wrapping_add(1);
153 if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
154 return Err(Error::NoSpace);
155 }
156 // `tail` is the next sqe to use.
157 let index = (tail & self.submit_ring.ring_mask) as usize;
158 let sqe = self.submit_queue_entries.get_mut(index).unwrap();
159
160 f(sqe, self.io_vecs[index].as_mut());
161
162 // Tells the kernel to use the new index when processing the entry at that index.
163 self.submit_ring.set_array_entry(index, index as u32);
164 // Ensure the above writes to sqe are seen before the tail is updated.
165 // set_tail uses Release ordering when storing to the ring.
166 self.submit_ring.pointers.set_tail(next_tail);
167
168 self.added += 1;
169
170 Ok(())
171 }
172
173 // Returns the number of entries that have been added to this SubmitQueue since the last time
174 // `prepare_submit` was called.
prepare_submit(&mut self) -> usize175 fn prepare_submit(&mut self) -> usize {
176 let out = self.added - self.submitting;
177 self.submitting = self.added;
178
179 out
180 }
181
182 // Indicates that we failed to submit `count` entries to the kernel and that they should be
183 // retried.
fail_submit(&mut self, count: usize)184 fn fail_submit(&mut self, count: usize) {
185 debug_assert!(count <= self.submitting);
186 self.submitting -= count;
187 }
188
189 // Indicates that `count` entries have been submitted to the kernel and so the space may be
190 // reused for new entries.
complete_submit(&mut self, count: usize)191 fn complete_submit(&mut self, count: usize) {
192 debug_assert!(count <= self.submitting);
193 self.submitting -= count;
194 self.added -= count;
195 }
196
add_rw_op( &mut self, ptr: *const u8, len: usize, fd: RawFd, offset: Option<u64>, user_data: UserData, op: u8, ) -> Result<()>197 unsafe fn add_rw_op(
198 &mut self,
199 ptr: *const u8,
200 len: usize,
201 fd: RawFd,
202 offset: Option<u64>,
203 user_data: UserData,
204 op: u8,
205 ) -> Result<()> {
206 self.prep_next_sqe(|sqe, iovec| {
207 iovec.iov_base = ptr as *const libc::c_void as *mut _;
208 iovec.iov_len = len;
209 sqe.opcode = op;
210 sqe.set_addr(iovec as *const _ as *const libc::c_void as u64);
211 sqe.len = 1;
212 sqe.set_off(file_offset_to_raw_offset(offset));
213 sqe.set_buf_index(0);
214 sqe.ioprio = 0;
215 sqe.user_data = user_data;
216 sqe.flags = 0;
217 sqe.fd = fd;
218 })?;
219
220 Ok(())
221 }
222 }
223
224 /// Enum to represent all io_uring operations
225 #[repr(u32)]
226 pub enum URingOperation {
227 Nop = io_uring_op_IORING_OP_NOP,
228 Readv = io_uring_op_IORING_OP_READV,
229 Writev = io_uring_op_IORING_OP_WRITEV,
230 Fsync = io_uring_op_IORING_OP_FSYNC,
231 ReadFixed = io_uring_op_IORING_OP_READ_FIXED,
232 WriteFixed = io_uring_op_IORING_OP_WRITE_FIXED,
233 PollAdd = io_uring_op_IORING_OP_POLL_ADD,
234 PollRemove = io_uring_op_IORING_OP_POLL_REMOVE,
235 SyncFileRange = io_uring_op_IORING_OP_SYNC_FILE_RANGE,
236 Sendmsg = io_uring_op_IORING_OP_SENDMSG,
237 Recvmsg = io_uring_op_IORING_OP_RECVMSG,
238 Timeout = io_uring_op_IORING_OP_TIMEOUT,
239 TimeoutRemove = io_uring_op_IORING_OP_TIMEOUT_REMOVE,
240 Accept = io_uring_op_IORING_OP_ACCEPT,
241 AsyncCancel = io_uring_op_IORING_OP_ASYNC_CANCEL,
242 LinkTimeout = io_uring_op_IORING_OP_LINK_TIMEOUT,
243 Connect = io_uring_op_IORING_OP_CONNECT,
244 Fallocate = io_uring_op_IORING_OP_FALLOCATE,
245 Openat = io_uring_op_IORING_OP_OPENAT,
246 Close = io_uring_op_IORING_OP_CLOSE,
247 FilesUpdate = io_uring_op_IORING_OP_FILES_UPDATE,
248 Statx = io_uring_op_IORING_OP_STATX,
249 Read = io_uring_op_IORING_OP_READ,
250 Write = io_uring_op_IORING_OP_WRITE,
251 Fadvise = io_uring_op_IORING_OP_FADVISE,
252 Madvise = io_uring_op_IORING_OP_MADVISE,
253 Send = io_uring_op_IORING_OP_SEND,
254 Recv = io_uring_op_IORING_OP_RECV,
255 Openat2 = io_uring_op_IORING_OP_OPENAT2,
256 EpollCtl = io_uring_op_IORING_OP_EPOLL_CTL,
257 Splice = io_uring_op_IORING_OP_SPLICE,
258 ProvideBuffers = io_uring_op_IORING_OP_PROVIDE_BUFFERS,
259 RemoveBuffers = io_uring_op_IORING_OP_REMOVE_BUFFERS,
260 Tee = io_uring_op_IORING_OP_TEE,
261 Shutdown = io_uring_op_IORING_OP_SHUTDOWN,
262 Renameat = io_uring_op_IORING_OP_RENAMEAT,
263 Unlinkat = io_uring_op_IORING_OP_UNLINKAT,
264 Mkdirat = io_uring_op_IORING_OP_MKDIRAT,
265 Symlinkat = io_uring_op_IORING_OP_SYMLINKAT,
266 Linkat = io_uring_op_IORING_OP_LINKAT,
267 }
268
269 /// Represents an allowlist of the restrictions to be registered to a uring.
270 #[derive(Default)]
271 pub struct URingAllowlist(Vec<io_uring_restriction>);
272
273 impl URingAllowlist {
274 /// Create a new `UringAllowList` which allows no operation.
new() -> Self275 pub fn new() -> Self {
276 URingAllowlist::default()
277 }
278
279 /// Allow `operation` to be submitted to the submit queue of the io_uring.
allow_submit_operation(&mut self, operation: URingOperation) -> &mut Self280 pub fn allow_submit_operation(&mut self, operation: URingOperation) -> &mut Self {
281 self.0.push(io_uring_restriction {
282 opcode: IORING_RESTRICTION_SQE_OP as u16,
283 __bindgen_anon_1: io_uring_restriction__bindgen_ty_1 {
284 sqe_op: operation as u8,
285 },
286 ..Default::default()
287 });
288 self
289 }
290 }
291
292 /// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations
293 /// to the kernel and asynchronously handling the completion of these operations.
294 /// Use the various `add_*` functions to configure operations, then call `wait` to start
295 /// the operations and get any completed results. Each op is given a u64 user_data argument that is
296 /// used to identify the result when returned in the iterator provided by `wait`.
297 ///
298 /// # Example polling an FD for readable status.
299 ///
300 /// ```
301 /// # use std::fs::File;
302 /// # use std::os::unix::io::AsRawFd;
303 /// # use std::path::Path;
304 /// # use base::EventType;
305 /// # use io_uring::URingContext;
306 /// let f = File::open(Path::new("/dev/zero")).unwrap();
307 /// let uring = URingContext::new(16, None).unwrap();
308 /// uring
309 /// .add_poll_fd(f.as_raw_fd(), EventType::Read, 454)
310 /// .unwrap();
311 /// let (user_data, res) = uring.wait().unwrap().next().unwrap();
312 /// assert_eq!(user_data, 454 as io_uring::UserData);
313 /// assert_eq!(res.unwrap(), 1 as u32);
314 ///
315 /// ```
316 pub struct URingContext {
317 ring_file: File, // Holds the io_uring context FD returned from io_uring_setup.
318 pub submit_ring: Mutex<SubmitQueue>,
319 pub complete_ring: CompleteQueueState,
320 in_flight: AtomicUsize, // The number of pending operations.
321 pub stats: URingStats,
322 }
323
324 impl URingContext {
325 /// Creates a `URingContext` where the underlying uring has a space for `num_entries`
326 /// simultaneous operations. If `allowlist` is given, all operations other
327 /// than those explicitly permitted by `allowlist` are prohibited.
new(num_entries: usize, allowlist: Option<&URingAllowlist>) -> Result<URingContext>328 pub fn new(num_entries: usize, allowlist: Option<&URingAllowlist>) -> Result<URingContext> {
329 let mut ring_params = io_uring_params::default();
330 if allowlist.is_some() {
331 // To register restrictions, a uring must start in a disabled state.
332 ring_params.flags |= IORING_SETUP_R_DISABLED;
333 }
334
335 // The below unsafe block isolates the creation of the URingContext. Each step on it's own
336 // is unsafe. Using the uring FD for the mapping and the offsets returned by the kernel for
337 // base addresses maintains safety guarantees assuming the kernel API guarantees are
338 // trusted.
339 unsafe {
340 // Safe because the kernel is trusted to only modify params and `File` is created with
341 // an FD that it takes complete ownership of.
342 let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?;
343 let ring_file = File::from_raw_fd(fd);
344
345 // Register the restrictions if it's given
346 if let Some(restrictions) = allowlist {
347 // safe because IORING_REGISTER_RESTRICTIONS does not modify the memory and `restrictions`
348 // contains a valid pointer and length.
349 io_uring_register(
350 fd,
351 IORING_REGISTER_RESTRICTIONS,
352 restrictions.0.as_ptr() as *const c_void,
353 restrictions.0.len() as u32,
354 )
355 .map_err(Error::RingRegister)?;
356
357 // enables the URingContext since it was started in a disabled state.
358 // safe because IORING_REGISTER_RESTRICTIONS does not modify the memory
359 io_uring_register(fd, IORING_REGISTER_ENABLE_RINGS, null::<c_void>(), 0)
360 .map_err(Error::RingRegister)?;
361 }
362
363 // Mmap the submit and completion queues.
364 // Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error
365 // is checked.
366 let submit_ring = SubmitQueueState::new(
367 MemoryMappingBuilder::new(
368 ring_params.sq_off.array as usize
369 + ring_params.sq_entries as usize * std::mem::size_of::<u32>(),
370 )
371 .from_file(&ring_file)
372 .offset(u64::from(IORING_OFF_SQ_RING))
373 .protection(Protection::read_write())
374 .populate()
375 .build()
376 .map_err(Error::MappingSubmitRing)?,
377 &ring_params,
378 );
379
380 let num_sqe = ring_params.sq_entries as usize;
381 let submit_queue_entries = SubmitQueueEntries {
382 mmap: MemoryMappingBuilder::new(
383 ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(),
384 )
385 .from_file(&ring_file)
386 .offset(u64::from(IORING_OFF_SQES))
387 .protection(Protection::read_write())
388 .populate()
389 .build()
390 .map_err(Error::MappingSubmitEntries)?,
391 len: num_sqe,
392 };
393
394 let complete_ring = CompleteQueueState::new(
395 MemoryMappingBuilder::new(
396 ring_params.cq_off.cqes as usize
397 + ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(),
398 )
399 .from_file(&ring_file)
400 .offset(u64::from(IORING_OFF_CQ_RING))
401 .protection(Protection::read_write())
402 .populate()
403 .build()
404 .map_err(Error::MappingCompleteRing)?,
405 &ring_params,
406 );
407
408 Ok(URingContext {
409 ring_file,
410 submit_ring: Mutex::new(SubmitQueue {
411 submit_ring,
412 submit_queue_entries,
413 io_vecs: Pin::from(vec![IoBufMut::new(&mut []); num_sqe].into_boxed_slice()),
414 submitting: 0,
415 added: 0,
416 num_sqes: ring_params.sq_entries as usize,
417 }),
418 complete_ring,
419 in_flight: AtomicUsize::new(0),
420 stats: Default::default(),
421 })
422 }
423 }
424
425 /// Asynchronously writes to `fd` from the address given in `ptr`.
426 /// # Safety
427 /// `add_write` will write up to `len` bytes of data from the address given by `ptr`. This is
428 /// only safe if the caller guarantees that the memory lives until the transaction is complete
429 /// and that completion has been returned from the `wait` function. In addition there must not
430 /// be other references to the data pointed to by `ptr` until the operation completes. Ensure
431 /// that the fd remains open until the op completes as well.
add_write( &self, ptr: *const u8, len: usize, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()>432 pub unsafe fn add_write(
433 &self,
434 ptr: *const u8,
435 len: usize,
436 fd: RawFd,
437 offset: Option<u64>,
438 user_data: UserData,
439 ) -> Result<()> {
440 self.submit_ring.lock().add_rw_op(
441 ptr,
442 len,
443 fd,
444 offset,
445 user_data,
446 io_uring_op_IORING_OP_WRITEV as u8,
447 )
448 }
449
450 /// Asynchronously reads from `fd` to the address given in `ptr`.
451 /// # Safety
452 /// `add_read` will write up to `len` bytes of data to the address given by `ptr`. This is only
453 /// safe if the caller guarantees there are no other references to that memory and that the
454 /// memory lives until the transaction is complete and that completion has been returned from
455 /// the `wait` function. In addition there must not be any mutable references to the data
456 /// pointed to by `ptr` until the operation completes. Ensure that the fd remains open until
457 /// the op completes as well.
add_read( &self, ptr: *mut u8, len: usize, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()>458 pub unsafe fn add_read(
459 &self,
460 ptr: *mut u8,
461 len: usize,
462 fd: RawFd,
463 offset: Option<u64>,
464 user_data: UserData,
465 ) -> Result<()> {
466 self.submit_ring.lock().add_rw_op(
467 ptr,
468 len,
469 fd,
470 offset,
471 user_data,
472 io_uring_op_IORING_OP_READV as u8,
473 )
474 }
475
476 /// # Safety
477 /// See 'writev' but accepts an iterator instead of a vector if there isn't already a vector in
478 /// existence.
add_writev_iter<I>( &self, iovecs: I, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()> where I: Iterator<Item = libc::iovec>,479 pub unsafe fn add_writev_iter<I>(
480 &self,
481 iovecs: I,
482 fd: RawFd,
483 offset: Option<u64>,
484 user_data: UserData,
485 ) -> Result<()>
486 where
487 I: Iterator<Item = libc::iovec>,
488 {
489 self.add_writev(
490 Pin::from(
491 // Safe because the caller is required to guarantee that the memory pointed to by
492 // `iovecs` lives until the transaction is complete and the completion has been
493 // returned from `wait()`.
494 iovecs
495 .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
496 .collect::<Vec<_>>()
497 .into_boxed_slice(),
498 ),
499 fd,
500 offset,
501 user_data,
502 )
503 }
504
505 /// Asynchronously writes to `fd` from the addresses given in `iovecs`.
506 /// # Safety
507 /// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller
508 /// guarantees there are no other references to that memory and that the memory lives until the
509 /// transaction is complete and that completion has been returned from the `wait` function. In
510 /// addition there must not be any mutable references to the data pointed to by `iovecs` until
511 /// the operation completes. Ensure that the fd remains open until the op completes as well.
512 /// The iovecs reference must be kept alive until the op returns.
add_writev( &self, iovecs: Pin<Box<[IoBufMut<'static>]>>, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()>513 pub unsafe fn add_writev(
514 &self,
515 iovecs: Pin<Box<[IoBufMut<'static>]>>,
516 fd: RawFd,
517 offset: Option<u64>,
518 user_data: UserData,
519 ) -> Result<()> {
520 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
521 sqe.opcode = io_uring_op_IORING_OP_WRITEV as u8;
522 sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
523 sqe.len = iovecs.len() as u32;
524 sqe.set_off(file_offset_to_raw_offset(offset));
525 sqe.set_buf_index(0);
526 sqe.ioprio = 0;
527 sqe.user_data = user_data;
528 sqe.flags = 0;
529 sqe.fd = fd;
530 })?;
531 self.complete_ring.add_op_data(user_data, iovecs);
532 Ok(())
533 }
534
535 /// # Safety
536 /// See 'readv' but accepts an iterator instead of a vector if there isn't already a vector in
537 /// existence.
add_readv_iter<I>( &self, iovecs: I, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()> where I: Iterator<Item = libc::iovec>,538 pub unsafe fn add_readv_iter<I>(
539 &self,
540 iovecs: I,
541 fd: RawFd,
542 offset: Option<u64>,
543 user_data: UserData,
544 ) -> Result<()>
545 where
546 I: Iterator<Item = libc::iovec>,
547 {
548 self.add_readv(
549 Pin::from(
550 // Safe because the caller is required to guarantee that the memory pointed to by
551 // `iovecs` lives until the transaction is complete and the completion has been
552 // returned from `wait()`.
553 iovecs
554 .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
555 .collect::<Vec<_>>()
556 .into_boxed_slice(),
557 ),
558 fd,
559 offset,
560 user_data,
561 )
562 }
563
564 /// Asynchronously reads from `fd` to the addresses given in `iovecs`.
565 /// # Safety
566 /// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller
567 /// guarantees there are no other references to that memory and that the memory lives until the
568 /// transaction is complete and that completion has been returned from the `wait` function. In
569 /// addition there must not be any references to the data pointed to by `iovecs` until the
570 /// operation completes. Ensure that the fd remains open until the op completes as well.
571 /// The iovecs reference must be kept alive until the op returns.
add_readv( &self, iovecs: Pin<Box<[IoBufMut<'static>]>>, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()>572 pub unsafe fn add_readv(
573 &self,
574 iovecs: Pin<Box<[IoBufMut<'static>]>>,
575 fd: RawFd,
576 offset: Option<u64>,
577 user_data: UserData,
578 ) -> Result<()> {
579 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
580 sqe.opcode = io_uring_op_IORING_OP_READV as u8;
581 sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
582 sqe.len = iovecs.len() as u32;
583 sqe.set_off(file_offset_to_raw_offset(offset));
584 sqe.set_buf_index(0);
585 sqe.ioprio = 0;
586 sqe.user_data = user_data;
587 sqe.flags = 0;
588 sqe.fd = fd;
589 })?;
590 self.complete_ring.add_op_data(user_data, iovecs);
591 Ok(())
592 }
593
594 /// Add a no-op operation that doesn't perform any IO. Useful for testing the performance of the
595 /// io_uring itself and for waking up a thread that's blocked inside a wait() call.
add_nop(&self, user_data: UserData) -> Result<()>596 pub fn add_nop(&self, user_data: UserData) -> Result<()> {
597 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
598 sqe.opcode = io_uring_op_IORING_OP_NOP as u8;
599 sqe.fd = -1;
600 sqe.user_data = user_data;
601
602 sqe.set_addr(0);
603 sqe.len = 0;
604 sqe.set_off(0);
605 sqe.set_buf_index(0);
606 sqe.set_rw_flags(0);
607 sqe.ioprio = 0;
608 sqe.flags = 0;
609 })
610 }
611
612 /// Syncs all completed operations, the ordering with in-flight async ops is not
613 /// defined.
add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()>614 pub fn add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()> {
615 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
616 sqe.opcode = io_uring_op_IORING_OP_FSYNC as u8;
617 sqe.fd = fd;
618 sqe.user_data = user_data;
619
620 sqe.set_addr(0);
621 sqe.len = 0;
622 sqe.set_off(0);
623 sqe.set_buf_index(0);
624 sqe.set_rw_flags(0);
625 sqe.ioprio = 0;
626 sqe.flags = 0;
627 })
628 }
629
630 /// See the usage of `fallocate`, this asynchronously performs the same operations.
add_fallocate( &self, fd: RawFd, offset: u64, len: u64, mode: u32, user_data: UserData, ) -> Result<()>631 pub fn add_fallocate(
632 &self,
633 fd: RawFd,
634 offset: u64,
635 len: u64,
636 mode: u32,
637 user_data: UserData,
638 ) -> Result<()> {
639 // Note that len for fallocate in passed in the addr field of the sqe and the mode uses the
640 // len field.
641 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
642 sqe.opcode = io_uring_op_IORING_OP_FALLOCATE as u8;
643
644 sqe.fd = fd;
645 sqe.set_addr(len);
646 sqe.len = mode;
647 sqe.set_off(offset);
648 sqe.user_data = user_data;
649
650 sqe.set_buf_index(0);
651 sqe.set_rw_flags(0);
652 sqe.ioprio = 0;
653 sqe.flags = 0;
654 })
655 }
656
657 /// Adds an FD to be polled based on the given flags.
658 /// The user must keep the FD open until the operation completion is returned from
659 /// `wait`.
660 /// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added
661 /// to get future events.
add_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()>662 pub fn add_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
663 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
664 sqe.opcode = io_uring_op_IORING_OP_POLL_ADD as u8;
665 sqe.fd = fd;
666 sqe.user_data = user_data;
667 sqe.set_poll_events(events.into());
668
669 sqe.set_addr(0);
670 sqe.len = 0;
671 sqe.set_off(0);
672 sqe.set_buf_index(0);
673 sqe.ioprio = 0;
674 sqe.flags = 0;
675 })
676 }
677
678 /// Removes an FD that was previously added with `add_poll_fd`.
remove_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()>679 pub fn remove_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
680 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
681 sqe.opcode = io_uring_op_IORING_OP_POLL_REMOVE as u8;
682 sqe.fd = fd;
683 sqe.user_data = user_data;
684 sqe.set_poll_events(events.into());
685
686 sqe.set_addr(0);
687 sqe.len = 0;
688 sqe.set_off(0);
689 sqe.set_buf_index(0);
690 sqe.ioprio = 0;
691 sqe.flags = 0;
692 })
693 }
694
695 /// Attempt to cancel an already issued request. addr must contain the user_data field of the
696 /// request that should be cancelled. The cancellation request will complete with one of the
697 /// following results codes. If found, the res field of the cqe will contain 0. If not found,
698 /// res will contain -ENOENT. If found and attempted cancelled, the res field will contain
699 /// -EALREADY. In this case, the request may or may not terminate. In general, requests that
700 /// are interruptible (like socket IO) will get cancelled, while disk IO requests cannot be
701 /// cancelled if already started.
async_cancel(&self, addr: UserData, user_data: UserData) -> Result<()>702 pub fn async_cancel(&self, addr: UserData, user_data: UserData) -> Result<()> {
703 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
704 sqe.opcode = io_uring_op_IORING_OP_ASYNC_CANCEL as u8;
705 sqe.user_data = user_data;
706 sqe.set_addr(addr);
707
708 sqe.len = 0;
709 sqe.fd = 0;
710 sqe.set_off(0);
711 sqe.set_buf_index(0);
712 sqe.ioprio = 0;
713 sqe.flags = 0;
714 })
715 }
716
717 // Calls io_uring_enter, submitting any new sqes that have been added to the submit queue and
718 // waiting for `wait_nr` operations to complete.
enter(&self, wait_nr: u64) -> Result<()>719 fn enter(&self, wait_nr: u64) -> Result<()> {
720 let completed = self.complete_ring.num_completed();
721 self.stats
722 .total_complete
723 .fetch_add(completed as u64, Ordering::Relaxed);
724 self.in_flight.fetch_sub(completed, Ordering::Relaxed);
725
726 let added = self.submit_ring.lock().prepare_submit();
727 if added == 0 && wait_nr == 0 {
728 return Ok(());
729 }
730
731 self.stats.total_enter_calls.fetch_add(1, Ordering::Relaxed);
732 let flags = if wait_nr > 0 {
733 IORING_ENTER_GETEVENTS
734 } else {
735 0
736 };
737 let res = unsafe {
738 // Safe because the only memory modified is in the completion queue.
739 io_uring_enter(self.ring_file.as_raw_fd(), added as u64, wait_nr, flags)
740 };
741
742 match res {
743 Ok(_) => {
744 self.submit_ring.lock().complete_submit(added);
745 self.stats
746 .total_ops
747 .fetch_add(added as u64, Ordering::Relaxed);
748
749 // Release store synchronizes with acquire load above.
750 self.in_flight.fetch_add(added, Ordering::Release);
751 }
752 Err(e) => {
753 // An EBUSY return means that some completed events must be processed before
754 // submitting more, so wait for some to finish without pushing the new sqes in
755 // that case.
756 // An EINTR means we successfully submitted the events but were interrupted while
757 // waiting, so just wait again.
758 // Any other error should be propagated up.
759
760 if e != libc::EINTR {
761 self.submit_ring.lock().fail_submit(added);
762 }
763
764 if wait_nr == 0 || (e != libc::EBUSY && e != libc::EINTR) {
765 return Err(Error::RingEnter(e));
766 }
767
768 loop {
769 // Safe because the only memory modified is in the completion queue.
770 let res =
771 unsafe { io_uring_enter(self.ring_file.as_raw_fd(), 0, wait_nr, flags) };
772 if res != Err(libc::EINTR) {
773 return res.map_err(Error::RingEnter);
774 }
775 }
776 }
777 }
778
779 Ok(())
780 }
781
782 /// Sends operations added with the `add_*` functions to the kernel.
submit(&self) -> Result<()>783 pub fn submit(&self) -> Result<()> {
784 self.enter(0)
785 }
786
787 /// Sends operations added with the `add_*` functions to the kernel and return an iterator to any
788 /// completed operations. `wait` blocks until at least one completion is ready. If called
789 /// without any new events added, this simply waits for any existing events to complete and
790 /// returns as soon an one or more is ready.
wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_>791 pub fn wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_> {
792 // We only want to wait for events if there aren't already events in the completion queue.
793 let wait_nr = if self.complete_ring.num_ready() > 0 {
794 0
795 } else {
796 1
797 };
798
799 // The CompletionQueue will iterate all completed ops.
800 match self.enter(wait_nr) {
801 Ok(()) => Ok(&self.complete_ring),
802 // If we cannot submit any more entries then we need to pull stuff out of the completion
803 // ring, so just return the completion ring. This can only happen when `wait_nr` is 0 so
804 // we know there are already entries in the completion queue.
805 Err(Error::RingEnter(libc::EBUSY)) => Ok(&self.complete_ring),
806 Err(e) => Err(e),
807 }
808 }
809 }
810
811 impl AsRawFd for URingContext {
as_raw_fd(&self) -> RawFd812 fn as_raw_fd(&self) -> RawFd {
813 self.ring_file.as_raw_fd()
814 }
815 }
816
817 impl AsRawDescriptor for URingContext {
as_raw_descriptor(&self) -> RawDescriptor818 fn as_raw_descriptor(&self) -> RawDescriptor {
819 self.ring_file.as_raw_descriptor()
820 }
821 }
822
823 struct SubmitQueueEntries {
824 mmap: MemoryMapping,
825 len: usize,
826 }
827
828 impl SubmitQueueEntries {
get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe>829 fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> {
830 if index >= self.len {
831 return None;
832 }
833 let mut_ref = unsafe {
834 // Safe because the mut borrow of self resticts to one mutable reference at a time and
835 // we trust that the kernel has returned enough memory in io_uring_setup and mmap.
836 &mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index)
837 };
838 // Clear any state.
839 *mut_ref = io_uring_sqe::default();
840 Some(mut_ref)
841 }
842 }
843
844 struct SubmitQueueState {
845 _mmap: MemoryMapping,
846 pointers: QueuePointers,
847 ring_mask: u32,
848 array: AtomicPtr<u32>,
849 }
850
851 impl SubmitQueueState {
852 // # Safety
853 // Safe iff `mmap` is created by mapping from a uring FD at the SQ_RING offset and params is
854 // the params struct passed to io_uring_setup.
new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState855 unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState {
856 let ptr = mmap.as_ptr();
857 // Transmutes are safe because a u32 is atomic on all supported architectures and the
858 // pointer will live until after self is dropped because the mmap is owned.
859 let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32;
860 let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32;
861 // This offset is guaranteed to be within the mmap so unwrap the result.
862 let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap();
863 let array = AtomicPtr::new(ptr.add(params.sq_off.array as usize) as *mut u32);
864 SubmitQueueState {
865 _mmap: mmap,
866 pointers: QueuePointers { head, tail },
867 ring_mask,
868 array,
869 }
870 }
871
872 // Sets the kernel's array entry at the given `index` to `value`.
set_array_entry(&self, index: usize, value: u32)873 fn set_array_entry(&self, index: usize, value: u32) {
874 // Safe because self being constructed from the correct mmap guaratees that the memory is
875 // valid to written.
876 unsafe {
877 std::ptr::write_volatile(self.array.load(Ordering::Relaxed).add(index), value as u32);
878 }
879 }
880 }
881
882 #[derive(Default)]
883 struct CompleteQueueData {
884 completed: usize,
885 //For ops that pass in arrays of iovecs, they need to be valid for the duration of the
886 //operation because the kernel might read them at any time.
887 pending_op_addrs: BTreeMap<UserData, Pin<Box<[IoBufMut<'static>]>>>,
888 }
889
890 pub struct CompleteQueueState {
891 mmap: MemoryMapping,
892 pointers: QueuePointers,
893 ring_mask: u32,
894 cqes_offset: u32,
895 data: Mutex<CompleteQueueData>,
896 }
897
898 impl CompleteQueueState {
899 /// # Safety
900 /// Safe iff `mmap` is created by mapping from a uring FD at the CQ_RING offset and params is
901 /// the params struct passed to io_uring_setup.
new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState902 unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState {
903 let ptr = mmap.as_ptr();
904 let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32;
905 let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32;
906 let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap();
907 CompleteQueueState {
908 mmap,
909 pointers: QueuePointers { head, tail },
910 ring_mask,
911 cqes_offset: params.cq_off.cqes,
912 data: Default::default(),
913 }
914 }
915
add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>)916 fn add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>) {
917 self.data.lock().pending_op_addrs.insert(user_data, addrs);
918 }
919
get_cqe(&self, head: u32) -> &io_uring_cqe920 fn get_cqe(&self, head: u32) -> &io_uring_cqe {
921 unsafe {
922 // Safe because we trust that the kernel has returned enough memory in io_uring_setup
923 // and mmap and index is checked within range by the ring_mask.
924 let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize)
925 as *const io_uring_cqe;
926
927 let index = head & self.ring_mask;
928
929 &*cqes.add(index as usize)
930 }
931 }
932
num_ready(&self) -> u32933 pub fn num_ready(&self) -> u32 {
934 let tail = self.pointers.tail(Ordering::Acquire);
935 let head = self.pointers.head(Ordering::Relaxed);
936
937 tail.saturating_sub(head)
938 }
939
num_completed(&self) -> usize940 fn num_completed(&self) -> usize {
941 let mut data = self.data.lock();
942 ::std::mem::replace(&mut data.completed, 0)
943 }
944
pop_front(&self) -> Option<(UserData, std::io::Result<u32>)>945 fn pop_front(&self) -> Option<(UserData, std::io::Result<u32>)> {
946 // Take the lock on self.data first so that 2 threads don't try to pop the same completed op
947 // from the queue.
948 let mut data = self.data.lock();
949
950 // Safe because the pointers to the atomics are valid and the cqe must be in range
951 // because the kernel provided mask is applied to the index.
952 let head = self.pointers.head(Ordering::Relaxed);
953
954 // Synchronize the read of tail after the read of head.
955 if head == self.pointers.tail(Ordering::Acquire) {
956 return None;
957 }
958
959 data.completed += 1;
960
961 let cqe = self.get_cqe(head);
962 let user_data = cqe.user_data;
963 let res = cqe.res;
964
965 // free the addrs saved for this op.
966 let _ = data.pending_op_addrs.remove(&user_data);
967
968 // Store the new head and ensure the reads above complete before the kernel sees the
969 // update to head, `set_head` uses `Release` ordering
970 let new_head = head.wrapping_add(1);
971 self.pointers.set_head(new_head);
972
973 let io_res = match res {
974 r if r < 0 => Err(std::io::Error::from_raw_os_error(-r)),
975 r => Ok(r as u32),
976 };
977 Some((user_data, io_res))
978 }
979 }
980
981 // Return the completed ops with their result.
982 impl<'c> Iterator for &'c CompleteQueueState {
983 type Item = (UserData, std::io::Result<u32>);
984
next(&mut self) -> Option<Self::Item>985 fn next(&mut self) -> Option<Self::Item> {
986 self.pop_front()
987 }
988 }
989
990 struct QueuePointers {
991 head: *const AtomicU32,
992 tail: *const AtomicU32,
993 }
994
995 // Rust pointers don't implement Send or Sync but in this case both fields are atomics and so it's
996 // safe to send the pointers between threads or access them concurrently from multiple threads.
997 unsafe impl Send for QueuePointers {}
998 unsafe impl Sync for QueuePointers {}
999
1000 impl QueuePointers {
1001 // Loads the tail pointer atomically with the given ordering.
tail(&self, ordering: Ordering) -> u321002 fn tail(&self, ordering: Ordering) -> u32 {
1003 // Safe because self being constructed from the correct mmap guaratees that the memory is
1004 // valid to read.
1005 unsafe { (*self.tail).load(ordering) }
1006 }
1007
1008 // Stores the new value of the tail in the submit queue. This allows the kernel to start
1009 // processing entries that have been added up until the given tail pointer.
1010 // Always stores with release ordering as that is the only valid way to use the pointer.
set_tail(&self, next_tail: u32)1011 fn set_tail(&self, next_tail: u32) {
1012 // Safe because self being constructed from the correct mmap guaratees that the memory is
1013 // valid to read and it's used as an atomic to cover mutability concerns.
1014 unsafe { (*self.tail).store(next_tail, Ordering::Release) }
1015 }
1016
1017 // Loads the head pointer atomically with the given ordering.
head(&self, ordering: Ordering) -> u321018 fn head(&self, ordering: Ordering) -> u32 {
1019 // Safe because self being constructed from the correct mmap guaratees that the memory is
1020 // valid to read.
1021 unsafe { (*self.head).load(ordering) }
1022 }
1023
1024 // Stores the new value of the head in the submit queue. This allows the kernel to start
1025 // processing entries that have been added up until the given head pointer.
1026 // Always stores with release ordering as that is the only valid way to use the pointer.
set_head(&self, next_head: u32)1027 fn set_head(&self, next_head: u32) {
1028 // Safe because self being constructed from the correct mmap guaratees that the memory is
1029 // valid to read and it's used as an atomic to cover mutability concerns.
1030 unsafe { (*self.head).store(next_head, Ordering::Release) }
1031 }
1032 }
1033