• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 // This file makes several casts from u8 pointers into more-aligned pointer types.
6 // We assume that the kernel will give us suitably aligned memory.
7 #![allow(clippy::cast_ptr_alignment)]
8 
9 use std::collections::BTreeMap;
10 use std::fs::File;
11 use std::io;
12 use std::os::unix::io::AsRawFd;
13 use std::os::unix::io::FromRawFd;
14 use std::os::unix::io::RawFd;
15 use std::pin::Pin;
16 use std::ptr::null;
17 use std::sync::atomic::AtomicPtr;
18 use std::sync::atomic::AtomicU32;
19 use std::sync::atomic::AtomicU64;
20 use std::sync::atomic::AtomicUsize;
21 use std::sync::atomic::Ordering;
22 
23 use base::AsRawDescriptor;
24 use base::EventType;
25 use base::MappedRegion;
26 use base::MemoryMapping;
27 use base::MemoryMappingBuilder;
28 use base::Protection;
29 use base::RawDescriptor;
30 use data_model::IoBufMut;
31 use libc::c_void;
32 use remain::sorted;
33 use sync::Mutex;
34 use thiserror::Error as ThisError;
35 
36 use crate::bindings::*;
37 use crate::syscalls::*;
38 
39 /// Holds per-operation, user specified data. The usage is up to the caller. The most common use is
40 /// for callers to identify each request.
41 pub type UserData = u64;
42 
43 #[sorted]
44 #[derive(Debug, ThisError)]
45 pub enum Error {
46     /// Failed to map the completion ring.
47     #[error("Failed to mmap completion ring {0}")]
48     MappingCompleteRing(base::MmapError),
49     /// Failed to map submit entries.
50     #[error("Failed to mmap submit entries {0}")]
51     MappingSubmitEntries(base::MmapError),
52     /// Failed to map the submit ring.
53     #[error("Failed to mmap submit ring {0}")]
54     MappingSubmitRing(base::MmapError),
55     /// Too many ops are already queued.
56     #[error("No space for more ring entries, try increasing the size passed to `new`")]
57     NoSpace,
58     /// The call to `io_uring_enter` failed with the given errno.
59     #[error("Failed to enter io uring: {0}")]
60     RingEnter(libc::c_int),
61     /// The call to `io_uring_register` failed with the given errno.
62     #[error("Failed to register operations for io uring: {0}")]
63     RingRegister(libc::c_int),
64     /// The call to `io_uring_setup` failed with the given errno.
65     #[error("Failed to setup io uring {0}")]
66     Setup(libc::c_int),
67 }
68 pub type Result<T> = std::result::Result<T, Error>;
69 
70 impl From<Error> for io::Error {
from(e: Error) -> Self71     fn from(e: Error) -> Self {
72         use Error::*;
73         match e {
74             RingEnter(errno) => io::Error::from_raw_os_error(errno),
75             Setup(errno) => io::Error::from_raw_os_error(errno),
76             e => io::Error::new(io::ErrorKind::Other, e),
77         }
78     }
79 }
80 
81 /// Basic statistics about the operations that have been submitted to the uring.
82 #[derive(Default)]
83 pub struct URingStats {
84     total_enter_calls: AtomicU64, // Number of times the uring has been entered.
85     pub total_ops: AtomicU64,     // Total ops submitted to io_uring.
86     total_complete: AtomicU64,    // Total ops completed by io_uring.
87 }
88 
89 pub struct SubmitQueue {
90     submit_ring: SubmitQueueState,
91     submit_queue_entries: SubmitQueueEntries,
92     io_vecs: Pin<Box<[IoBufMut<'static>]>>,
93     submitting: usize, // The number of ops in the process of being submitted.
94     pub added: usize,  // The number of ops added since the last call to `io_uring_enter`.
95     num_sqes: usize,   // The total number of sqes allocated in shared memory.
96 }
97 
98 // Helper functions to set io_uring_sqe bindgen union members in a less verbose manner.
99 impl io_uring_sqe {
set_addr(&mut self, val: u64)100     pub fn set_addr(&mut self, val: u64) {
101         self.__bindgen_anon_2.addr = val;
102     }
set_off(&mut self, val: u64)103     pub fn set_off(&mut self, val: u64) {
104         self.__bindgen_anon_1.off = val;
105     }
106 
set_buf_index(&mut self, val: u16)107     pub fn set_buf_index(&mut self, val: u16) {
108         self.__bindgen_anon_4.buf_index = val;
109     }
110 
set_rw_flags(&mut self, val: libc::c_int)111     pub fn set_rw_flags(&mut self, val: libc::c_int) {
112         self.__bindgen_anon_3.rw_flags = val;
113     }
114 
set_poll_events(&mut self, val: u32)115     pub fn set_poll_events(&mut self, val: u32) {
116         let val = if cfg!(target_endian = "big") {
117             // Swap words on big-endian platforms to match the original ABI where poll_events was 16
118             // bits wide.
119             val.rotate_left(16)
120         } else {
121             val
122         };
123         self.__bindgen_anon_3.poll32_events = val;
124     }
125 }
126 
127 // Convert a file offset to the raw io_uring offset format.
128 // Some => explicit offset
129 // None => use current file position
file_offset_to_raw_offset(offset: Option<u64>) -> u64130 fn file_offset_to_raw_offset(offset: Option<u64>) -> u64 {
131     // File offsets are interpretted as off64_t inside io_uring, with -1 representing the current
132     // file position.
133     const USE_CURRENT_FILE_POS: libc::off64_t = -1;
134     offset.unwrap_or(USE_CURRENT_FILE_POS as u64)
135 }
136 
137 impl SubmitQueue {
138     // Call `f` with the next available sqe or return an error if none are available.
139     // After `f` returns, the sqe is appended to the kernel's queue.
prep_next_sqe<F>(&mut self, mut f: F) -> Result<()> where F: FnMut(&mut io_uring_sqe, &mut libc::iovec),140     fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()>
141     where
142         F: FnMut(&mut io_uring_sqe, &mut libc::iovec),
143     {
144         if self.added == self.num_sqes {
145             return Err(Error::NoSpace);
146         }
147 
148         // Find the next free submission entry in the submit ring and fill it with an iovec.
149         // The below raw pointer derefs are safe because the memory the pointers use lives as long
150         // as the mmap in self.
151         let tail = self.submit_ring.pointers.tail(Ordering::Relaxed);
152         let next_tail = tail.wrapping_add(1);
153         if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) {
154             return Err(Error::NoSpace);
155         }
156         // `tail` is the next sqe to use.
157         let index = (tail & self.submit_ring.ring_mask) as usize;
158         let sqe = self.submit_queue_entries.get_mut(index).unwrap();
159 
160         f(sqe, self.io_vecs[index].as_mut());
161 
162         // Tells the kernel to use the new index when processing the entry at that index.
163         self.submit_ring.set_array_entry(index, index as u32);
164         // Ensure the above writes to sqe are seen before the tail is updated.
165         // set_tail uses Release ordering when storing to the ring.
166         self.submit_ring.pointers.set_tail(next_tail);
167 
168         self.added += 1;
169 
170         Ok(())
171     }
172 
173     // Returns the number of entries that have been added to this SubmitQueue since the last time
174     // `prepare_submit` was called.
prepare_submit(&mut self) -> usize175     fn prepare_submit(&mut self) -> usize {
176         let out = self.added - self.submitting;
177         self.submitting = self.added;
178 
179         out
180     }
181 
182     // Indicates that we failed to submit `count` entries to the kernel and that they should be
183     // retried.
fail_submit(&mut self, count: usize)184     fn fail_submit(&mut self, count: usize) {
185         debug_assert!(count <= self.submitting);
186         self.submitting -= count;
187     }
188 
189     // Indicates that `count` entries have been submitted to the kernel and so the space may be
190     // reused for new entries.
complete_submit(&mut self, count: usize)191     fn complete_submit(&mut self, count: usize) {
192         debug_assert!(count <= self.submitting);
193         self.submitting -= count;
194         self.added -= count;
195     }
196 
add_rw_op( &mut self, ptr: *const u8, len: usize, fd: RawFd, offset: Option<u64>, user_data: UserData, op: u8, ) -> Result<()>197     unsafe fn add_rw_op(
198         &mut self,
199         ptr: *const u8,
200         len: usize,
201         fd: RawFd,
202         offset: Option<u64>,
203         user_data: UserData,
204         op: u8,
205     ) -> Result<()> {
206         self.prep_next_sqe(|sqe, iovec| {
207             iovec.iov_base = ptr as *const libc::c_void as *mut _;
208             iovec.iov_len = len;
209             sqe.opcode = op;
210             sqe.set_addr(iovec as *const _ as *const libc::c_void as u64);
211             sqe.len = 1;
212             sqe.set_off(file_offset_to_raw_offset(offset));
213             sqe.set_buf_index(0);
214             sqe.ioprio = 0;
215             sqe.user_data = user_data;
216             sqe.flags = 0;
217             sqe.fd = fd;
218         })?;
219 
220         Ok(())
221     }
222 }
223 
224 /// Enum to represent all io_uring operations
225 #[repr(u32)]
226 pub enum URingOperation {
227     Nop = io_uring_op_IORING_OP_NOP,
228     Readv = io_uring_op_IORING_OP_READV,
229     Writev = io_uring_op_IORING_OP_WRITEV,
230     Fsync = io_uring_op_IORING_OP_FSYNC,
231     ReadFixed = io_uring_op_IORING_OP_READ_FIXED,
232     WriteFixed = io_uring_op_IORING_OP_WRITE_FIXED,
233     PollAdd = io_uring_op_IORING_OP_POLL_ADD,
234     PollRemove = io_uring_op_IORING_OP_POLL_REMOVE,
235     SyncFileRange = io_uring_op_IORING_OP_SYNC_FILE_RANGE,
236     Sendmsg = io_uring_op_IORING_OP_SENDMSG,
237     Recvmsg = io_uring_op_IORING_OP_RECVMSG,
238     Timeout = io_uring_op_IORING_OP_TIMEOUT,
239     TimeoutRemove = io_uring_op_IORING_OP_TIMEOUT_REMOVE,
240     Accept = io_uring_op_IORING_OP_ACCEPT,
241     AsyncCancel = io_uring_op_IORING_OP_ASYNC_CANCEL,
242     LinkTimeout = io_uring_op_IORING_OP_LINK_TIMEOUT,
243     Connect = io_uring_op_IORING_OP_CONNECT,
244     Fallocate = io_uring_op_IORING_OP_FALLOCATE,
245     Openat = io_uring_op_IORING_OP_OPENAT,
246     Close = io_uring_op_IORING_OP_CLOSE,
247     FilesUpdate = io_uring_op_IORING_OP_FILES_UPDATE,
248     Statx = io_uring_op_IORING_OP_STATX,
249     Read = io_uring_op_IORING_OP_READ,
250     Write = io_uring_op_IORING_OP_WRITE,
251     Fadvise = io_uring_op_IORING_OP_FADVISE,
252     Madvise = io_uring_op_IORING_OP_MADVISE,
253     Send = io_uring_op_IORING_OP_SEND,
254     Recv = io_uring_op_IORING_OP_RECV,
255     Openat2 = io_uring_op_IORING_OP_OPENAT2,
256     EpollCtl = io_uring_op_IORING_OP_EPOLL_CTL,
257     Splice = io_uring_op_IORING_OP_SPLICE,
258     ProvideBuffers = io_uring_op_IORING_OP_PROVIDE_BUFFERS,
259     RemoveBuffers = io_uring_op_IORING_OP_REMOVE_BUFFERS,
260     Tee = io_uring_op_IORING_OP_TEE,
261     Shutdown = io_uring_op_IORING_OP_SHUTDOWN,
262     Renameat = io_uring_op_IORING_OP_RENAMEAT,
263     Unlinkat = io_uring_op_IORING_OP_UNLINKAT,
264     Mkdirat = io_uring_op_IORING_OP_MKDIRAT,
265     Symlinkat = io_uring_op_IORING_OP_SYMLINKAT,
266     Linkat = io_uring_op_IORING_OP_LINKAT,
267 }
268 
269 /// Represents an allowlist of the restrictions to be registered to a uring.
270 #[derive(Default)]
271 pub struct URingAllowlist(Vec<io_uring_restriction>);
272 
273 impl URingAllowlist {
274     /// Create a new `UringAllowList` which allows no operation.
new() -> Self275     pub fn new() -> Self {
276         URingAllowlist::default()
277     }
278 
279     /// Allow `operation` to be submitted to the submit queue of the io_uring.
allow_submit_operation(&mut self, operation: URingOperation) -> &mut Self280     pub fn allow_submit_operation(&mut self, operation: URingOperation) -> &mut Self {
281         self.0.push(io_uring_restriction {
282             opcode: IORING_RESTRICTION_SQE_OP as u16,
283             __bindgen_anon_1: io_uring_restriction__bindgen_ty_1 {
284                 sqe_op: operation as u8,
285             },
286             ..Default::default()
287         });
288         self
289     }
290 }
291 
292 /// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations
293 /// to the kernel and asynchronously handling the completion of these operations.
294 /// Use the various `add_*` functions to configure operations, then call `wait` to start
295 /// the operations and get any completed results. Each op is given a u64 user_data argument that is
296 /// used to identify the result when returned in the iterator provided by `wait`.
297 ///
298 /// # Example polling an FD for readable status.
299 ///
300 /// ```
301 /// # use std::fs::File;
302 /// # use std::os::unix::io::AsRawFd;
303 /// # use std::path::Path;
304 /// # use base::EventType;
305 /// # use io_uring::URingContext;
306 /// let f = File::open(Path::new("/dev/zero")).unwrap();
307 /// let uring = URingContext::new(16, None).unwrap();
308 /// uring
309 ///   .add_poll_fd(f.as_raw_fd(), EventType::Read, 454)
310 /// .unwrap();
311 /// let (user_data, res) = uring.wait().unwrap().next().unwrap();
312 /// assert_eq!(user_data, 454 as io_uring::UserData);
313 /// assert_eq!(res.unwrap(), 1 as u32);
314 ///
315 /// ```
316 pub struct URingContext {
317     ring_file: File, // Holds the io_uring context FD returned from io_uring_setup.
318     pub submit_ring: Mutex<SubmitQueue>,
319     pub complete_ring: CompleteQueueState,
320     in_flight: AtomicUsize, // The number of pending operations.
321     pub stats: URingStats,
322 }
323 
324 impl URingContext {
325     /// Creates a `URingContext` where the underlying uring has a space for `num_entries`
326     /// simultaneous operations. If `allowlist` is given, all operations other
327     /// than those explicitly permitted by `allowlist` are prohibited.
new(num_entries: usize, allowlist: Option<&URingAllowlist>) -> Result<URingContext>328     pub fn new(num_entries: usize, allowlist: Option<&URingAllowlist>) -> Result<URingContext> {
329         let mut ring_params = io_uring_params::default();
330         if allowlist.is_some() {
331             // To register restrictions, a uring must start in a disabled state.
332             ring_params.flags |= IORING_SETUP_R_DISABLED;
333         }
334 
335         // The below unsafe block isolates the creation of the URingContext. Each step on it's own
336         // is unsafe. Using the uring FD for the mapping and the offsets returned by the kernel for
337         // base addresses maintains safety guarantees assuming the kernel API guarantees are
338         // trusted.
339         unsafe {
340             // Safe because the kernel is trusted to only modify params and `File` is created with
341             // an FD that it takes complete ownership of.
342             let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?;
343             let ring_file = File::from_raw_fd(fd);
344 
345             // Register the restrictions if it's given
346             if let Some(restrictions) = allowlist {
347                 // safe because IORING_REGISTER_RESTRICTIONS does not modify the memory and `restrictions`
348                 // contains a valid pointer and length.
349                 io_uring_register(
350                     fd,
351                     IORING_REGISTER_RESTRICTIONS,
352                     restrictions.0.as_ptr() as *const c_void,
353                     restrictions.0.len() as u32,
354                 )
355                 .map_err(Error::RingRegister)?;
356 
357                 // enables the URingContext since it was started in a disabled state.
358                 // safe because IORING_REGISTER_RESTRICTIONS does not modify the memory
359                 io_uring_register(fd, IORING_REGISTER_ENABLE_RINGS, null::<c_void>(), 0)
360                     .map_err(Error::RingRegister)?;
361             }
362 
363             // Mmap the submit and completion queues.
364             // Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error
365             // is checked.
366             let submit_ring = SubmitQueueState::new(
367                 MemoryMappingBuilder::new(
368                     ring_params.sq_off.array as usize
369                         + ring_params.sq_entries as usize * std::mem::size_of::<u32>(),
370                 )
371                 .from_file(&ring_file)
372                 .offset(u64::from(IORING_OFF_SQ_RING))
373                 .protection(Protection::read_write())
374                 .populate()
375                 .build()
376                 .map_err(Error::MappingSubmitRing)?,
377                 &ring_params,
378             );
379 
380             let num_sqe = ring_params.sq_entries as usize;
381             let submit_queue_entries = SubmitQueueEntries {
382                 mmap: MemoryMappingBuilder::new(
383                     ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(),
384                 )
385                 .from_file(&ring_file)
386                 .offset(u64::from(IORING_OFF_SQES))
387                 .protection(Protection::read_write())
388                 .populate()
389                 .build()
390                 .map_err(Error::MappingSubmitEntries)?,
391                 len: num_sqe,
392             };
393 
394             let complete_ring = CompleteQueueState::new(
395                 MemoryMappingBuilder::new(
396                     ring_params.cq_off.cqes as usize
397                         + ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(),
398                 )
399                 .from_file(&ring_file)
400                 .offset(u64::from(IORING_OFF_CQ_RING))
401                 .protection(Protection::read_write())
402                 .populate()
403                 .build()
404                 .map_err(Error::MappingCompleteRing)?,
405                 &ring_params,
406             );
407 
408             Ok(URingContext {
409                 ring_file,
410                 submit_ring: Mutex::new(SubmitQueue {
411                     submit_ring,
412                     submit_queue_entries,
413                     io_vecs: Pin::from(vec![IoBufMut::new(&mut []); num_sqe].into_boxed_slice()),
414                     submitting: 0,
415                     added: 0,
416                     num_sqes: ring_params.sq_entries as usize,
417                 }),
418                 complete_ring,
419                 in_flight: AtomicUsize::new(0),
420                 stats: Default::default(),
421             })
422         }
423     }
424 
425     /// Asynchronously writes to `fd` from the address given in `ptr`.
426     /// # Safety
427     /// `add_write` will write up to `len` bytes of data from the address given by `ptr`. This is
428     /// only safe if the caller guarantees that the memory lives until the transaction is complete
429     /// and that completion has been returned from the `wait` function. In addition there must not
430     /// be other references to the data pointed to by `ptr` until the operation completes.  Ensure
431     /// that the fd remains open until the op completes as well.
add_write( &self, ptr: *const u8, len: usize, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()>432     pub unsafe fn add_write(
433         &self,
434         ptr: *const u8,
435         len: usize,
436         fd: RawFd,
437         offset: Option<u64>,
438         user_data: UserData,
439     ) -> Result<()> {
440         self.submit_ring.lock().add_rw_op(
441             ptr,
442             len,
443             fd,
444             offset,
445             user_data,
446             io_uring_op_IORING_OP_WRITEV as u8,
447         )
448     }
449 
450     /// Asynchronously reads from `fd` to the address given in `ptr`.
451     /// # Safety
452     /// `add_read` will write up to `len` bytes of data to the address given by `ptr`. This is only
453     /// safe if the caller guarantees there are no other references to that memory and that the
454     /// memory lives until the transaction is complete and that completion has been returned from
455     /// the `wait` function.  In addition there must not be any mutable references to the data
456     /// pointed to by `ptr` until the operation completes.  Ensure that the fd remains open until
457     /// the op completes as well.
add_read( &self, ptr: *mut u8, len: usize, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()>458     pub unsafe fn add_read(
459         &self,
460         ptr: *mut u8,
461         len: usize,
462         fd: RawFd,
463         offset: Option<u64>,
464         user_data: UserData,
465     ) -> Result<()> {
466         self.submit_ring.lock().add_rw_op(
467             ptr,
468             len,
469             fd,
470             offset,
471             user_data,
472             io_uring_op_IORING_OP_READV as u8,
473         )
474     }
475 
476     /// # Safety
477     /// See 'writev' but accepts an iterator instead of a vector if there isn't already a vector in
478     /// existence.
add_writev_iter<I>( &self, iovecs: I, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()> where I: Iterator<Item = libc::iovec>,479     pub unsafe fn add_writev_iter<I>(
480         &self,
481         iovecs: I,
482         fd: RawFd,
483         offset: Option<u64>,
484         user_data: UserData,
485     ) -> Result<()>
486     where
487         I: Iterator<Item = libc::iovec>,
488     {
489         self.add_writev(
490             Pin::from(
491                 // Safe because the caller is required to guarantee that the memory pointed to by
492                 // `iovecs` lives until the transaction is complete and the completion has been
493                 // returned from `wait()`.
494                 iovecs
495                     .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
496                     .collect::<Vec<_>>()
497                     .into_boxed_slice(),
498             ),
499             fd,
500             offset,
501             user_data,
502         )
503     }
504 
505     /// Asynchronously writes to `fd` from the addresses given in `iovecs`.
506     /// # Safety
507     /// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller
508     /// guarantees there are no other references to that memory and that the memory lives until the
509     /// transaction is complete and that completion has been returned from the `wait` function.  In
510     /// addition there must not be any mutable references to the data pointed to by `iovecs` until
511     /// the operation completes.  Ensure that the fd remains open until the op completes as well.
512     /// The iovecs reference must be kept alive until the op returns.
add_writev( &self, iovecs: Pin<Box<[IoBufMut<'static>]>>, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()>513     pub unsafe fn add_writev(
514         &self,
515         iovecs: Pin<Box<[IoBufMut<'static>]>>,
516         fd: RawFd,
517         offset: Option<u64>,
518         user_data: UserData,
519     ) -> Result<()> {
520         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
521             sqe.opcode = io_uring_op_IORING_OP_WRITEV as u8;
522             sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
523             sqe.len = iovecs.len() as u32;
524             sqe.set_off(file_offset_to_raw_offset(offset));
525             sqe.set_buf_index(0);
526             sqe.ioprio = 0;
527             sqe.user_data = user_data;
528             sqe.flags = 0;
529             sqe.fd = fd;
530         })?;
531         self.complete_ring.add_op_data(user_data, iovecs);
532         Ok(())
533     }
534 
535     /// # Safety
536     /// See 'readv' but accepts an iterator instead of a vector if there isn't already a vector in
537     /// existence.
add_readv_iter<I>( &self, iovecs: I, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()> where I: Iterator<Item = libc::iovec>,538     pub unsafe fn add_readv_iter<I>(
539         &self,
540         iovecs: I,
541         fd: RawFd,
542         offset: Option<u64>,
543         user_data: UserData,
544     ) -> Result<()>
545     where
546         I: Iterator<Item = libc::iovec>,
547     {
548         self.add_readv(
549             Pin::from(
550                 // Safe because the caller is required to guarantee that the memory pointed to by
551                 // `iovecs` lives until the transaction is complete and the completion has been
552                 // returned from `wait()`.
553                 iovecs
554                     .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len))
555                     .collect::<Vec<_>>()
556                     .into_boxed_slice(),
557             ),
558             fd,
559             offset,
560             user_data,
561         )
562     }
563 
564     /// Asynchronously reads from `fd` to the addresses given in `iovecs`.
565     /// # Safety
566     /// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller
567     /// guarantees there are no other references to that memory and that the memory lives until the
568     /// transaction is complete and that completion has been returned from the `wait` function.  In
569     /// addition there must not be any references to the data pointed to by `iovecs` until the
570     /// operation completes.  Ensure that the fd remains open until the op completes as well.
571     /// The iovecs reference must be kept alive until the op returns.
add_readv( &self, iovecs: Pin<Box<[IoBufMut<'static>]>>, fd: RawFd, offset: Option<u64>, user_data: UserData, ) -> Result<()>572     pub unsafe fn add_readv(
573         &self,
574         iovecs: Pin<Box<[IoBufMut<'static>]>>,
575         fd: RawFd,
576         offset: Option<u64>,
577         user_data: UserData,
578     ) -> Result<()> {
579         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
580             sqe.opcode = io_uring_op_IORING_OP_READV as u8;
581             sqe.set_addr(iovecs.as_ptr() as *const _ as *const libc::c_void as u64);
582             sqe.len = iovecs.len() as u32;
583             sqe.set_off(file_offset_to_raw_offset(offset));
584             sqe.set_buf_index(0);
585             sqe.ioprio = 0;
586             sqe.user_data = user_data;
587             sqe.flags = 0;
588             sqe.fd = fd;
589         })?;
590         self.complete_ring.add_op_data(user_data, iovecs);
591         Ok(())
592     }
593 
594     /// Add a no-op operation that doesn't perform any IO. Useful for testing the performance of the
595     /// io_uring itself and for waking up a thread that's blocked inside a wait() call.
add_nop(&self, user_data: UserData) -> Result<()>596     pub fn add_nop(&self, user_data: UserData) -> Result<()> {
597         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
598             sqe.opcode = io_uring_op_IORING_OP_NOP as u8;
599             sqe.fd = -1;
600             sqe.user_data = user_data;
601 
602             sqe.set_addr(0);
603             sqe.len = 0;
604             sqe.set_off(0);
605             sqe.set_buf_index(0);
606             sqe.set_rw_flags(0);
607             sqe.ioprio = 0;
608             sqe.flags = 0;
609         })
610     }
611 
612     /// Syncs all completed operations, the ordering with in-flight async ops is not
613     /// defined.
add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()>614     pub fn add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()> {
615         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
616             sqe.opcode = io_uring_op_IORING_OP_FSYNC as u8;
617             sqe.fd = fd;
618             sqe.user_data = user_data;
619 
620             sqe.set_addr(0);
621             sqe.len = 0;
622             sqe.set_off(0);
623             sqe.set_buf_index(0);
624             sqe.set_rw_flags(0);
625             sqe.ioprio = 0;
626             sqe.flags = 0;
627         })
628     }
629 
630     /// See the usage of `fallocate`, this asynchronously performs the same operations.
add_fallocate( &self, fd: RawFd, offset: u64, len: u64, mode: u32, user_data: UserData, ) -> Result<()>631     pub fn add_fallocate(
632         &self,
633         fd: RawFd,
634         offset: u64,
635         len: u64,
636         mode: u32,
637         user_data: UserData,
638     ) -> Result<()> {
639         // Note that len for fallocate in passed in the addr field of the sqe and the mode uses the
640         // len field.
641         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
642             sqe.opcode = io_uring_op_IORING_OP_FALLOCATE as u8;
643 
644             sqe.fd = fd;
645             sqe.set_addr(len);
646             sqe.len = mode;
647             sqe.set_off(offset);
648             sqe.user_data = user_data;
649 
650             sqe.set_buf_index(0);
651             sqe.set_rw_flags(0);
652             sqe.ioprio = 0;
653             sqe.flags = 0;
654         })
655     }
656 
657     /// Adds an FD to be polled based on the given flags.
658     /// The user must keep the FD open until the operation completion is returned from
659     /// `wait`.
660     /// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added
661     /// to get future events.
add_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()>662     pub fn add_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
663         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
664             sqe.opcode = io_uring_op_IORING_OP_POLL_ADD as u8;
665             sqe.fd = fd;
666             sqe.user_data = user_data;
667             sqe.set_poll_events(events.into());
668 
669             sqe.set_addr(0);
670             sqe.len = 0;
671             sqe.set_off(0);
672             sqe.set_buf_index(0);
673             sqe.ioprio = 0;
674             sqe.flags = 0;
675         })
676     }
677 
678     /// Removes an FD that was previously added with `add_poll_fd`.
remove_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()>679     pub fn remove_poll_fd(&self, fd: RawFd, events: EventType, user_data: UserData) -> Result<()> {
680         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
681             sqe.opcode = io_uring_op_IORING_OP_POLL_REMOVE as u8;
682             sqe.fd = fd;
683             sqe.user_data = user_data;
684             sqe.set_poll_events(events.into());
685 
686             sqe.set_addr(0);
687             sqe.len = 0;
688             sqe.set_off(0);
689             sqe.set_buf_index(0);
690             sqe.ioprio = 0;
691             sqe.flags = 0;
692         })
693     }
694 
695     /// Attempt to cancel an already issued request. addr must contain the user_data field of the
696     /// request that should be cancelled. The cancellation request will complete with one of the
697     /// following results codes. If found, the res field of the cqe will contain 0. If not found,
698     /// res will contain -ENOENT. If found and attempted cancelled, the res field will contain
699     /// -EALREADY. In this case, the request may or may not terminate. In general, requests that
700     /// are interruptible (like socket IO) will get cancelled, while disk IO requests cannot be
701     /// cancelled if already started.
async_cancel(&self, addr: UserData, user_data: UserData) -> Result<()>702     pub fn async_cancel(&self, addr: UserData, user_data: UserData) -> Result<()> {
703         self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| {
704             sqe.opcode = io_uring_op_IORING_OP_ASYNC_CANCEL as u8;
705             sqe.user_data = user_data;
706             sqe.set_addr(addr);
707 
708             sqe.len = 0;
709             sqe.fd = 0;
710             sqe.set_off(0);
711             sqe.set_buf_index(0);
712             sqe.ioprio = 0;
713             sqe.flags = 0;
714         })
715     }
716 
717     // Calls io_uring_enter, submitting any new sqes that have been added to the submit queue and
718     // waiting for `wait_nr` operations to complete.
enter(&self, wait_nr: u64) -> Result<()>719     fn enter(&self, wait_nr: u64) -> Result<()> {
720         let completed = self.complete_ring.num_completed();
721         self.stats
722             .total_complete
723             .fetch_add(completed as u64, Ordering::Relaxed);
724         self.in_flight.fetch_sub(completed, Ordering::Relaxed);
725 
726         let added = self.submit_ring.lock().prepare_submit();
727         if added == 0 && wait_nr == 0 {
728             return Ok(());
729         }
730 
731         self.stats.total_enter_calls.fetch_add(1, Ordering::Relaxed);
732         let flags = if wait_nr > 0 {
733             IORING_ENTER_GETEVENTS
734         } else {
735             0
736         };
737         let res = unsafe {
738             // Safe because the only memory modified is in the completion queue.
739             io_uring_enter(self.ring_file.as_raw_fd(), added as u64, wait_nr, flags)
740         };
741 
742         match res {
743             Ok(_) => {
744                 self.submit_ring.lock().complete_submit(added);
745                 self.stats
746                     .total_ops
747                     .fetch_add(added as u64, Ordering::Relaxed);
748 
749                 // Release store synchronizes with acquire load above.
750                 self.in_flight.fetch_add(added, Ordering::Release);
751             }
752             Err(e) => {
753                 // An EBUSY return means that some completed events must be processed before
754                 // submitting more, so wait for some to finish without pushing the new sqes in
755                 // that case.
756                 // An EINTR means we successfully submitted the events but were interrupted while
757                 // waiting, so just wait again.
758                 // Any other error should be propagated up.
759 
760                 if e != libc::EINTR {
761                     self.submit_ring.lock().fail_submit(added);
762                 }
763 
764                 if wait_nr == 0 || (e != libc::EBUSY && e != libc::EINTR) {
765                     return Err(Error::RingEnter(e));
766                 }
767 
768                 loop {
769                     // Safe because the only memory modified is in the completion queue.
770                     let res =
771                         unsafe { io_uring_enter(self.ring_file.as_raw_fd(), 0, wait_nr, flags) };
772                     if res != Err(libc::EINTR) {
773                         return res.map_err(Error::RingEnter);
774                     }
775                 }
776             }
777         }
778 
779         Ok(())
780     }
781 
782     /// Sends operations added with the `add_*` functions to the kernel.
submit(&self) -> Result<()>783     pub fn submit(&self) -> Result<()> {
784         self.enter(0)
785     }
786 
787     /// Sends operations added with the `add_*` functions to the kernel and return an iterator to any
788     /// completed operations. `wait` blocks until at least one completion is ready.  If called
789     /// without any new events added, this simply waits for any existing events to complete and
790     /// returns as soon an one or more is ready.
wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_>791     pub fn wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_> {
792         // We only want to wait for events if there aren't already events in the completion queue.
793         let wait_nr = if self.complete_ring.num_ready() > 0 {
794             0
795         } else {
796             1
797         };
798 
799         // The CompletionQueue will iterate all completed ops.
800         match self.enter(wait_nr) {
801             Ok(()) => Ok(&self.complete_ring),
802             // If we cannot submit any more entries then we need to pull stuff out of the completion
803             // ring, so just return the completion ring. This can only happen when `wait_nr` is 0 so
804             // we know there are already entries in the completion queue.
805             Err(Error::RingEnter(libc::EBUSY)) => Ok(&self.complete_ring),
806             Err(e) => Err(e),
807         }
808     }
809 }
810 
811 impl AsRawFd for URingContext {
as_raw_fd(&self) -> RawFd812     fn as_raw_fd(&self) -> RawFd {
813         self.ring_file.as_raw_fd()
814     }
815 }
816 
817 impl AsRawDescriptor for URingContext {
as_raw_descriptor(&self) -> RawDescriptor818     fn as_raw_descriptor(&self) -> RawDescriptor {
819         self.ring_file.as_raw_descriptor()
820     }
821 }
822 
823 struct SubmitQueueEntries {
824     mmap: MemoryMapping,
825     len: usize,
826 }
827 
828 impl SubmitQueueEntries {
get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe>829     fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> {
830         if index >= self.len {
831             return None;
832         }
833         let mut_ref = unsafe {
834             // Safe because the mut borrow of self resticts to one mutable reference at a time and
835             // we trust that the kernel has returned enough memory in io_uring_setup and mmap.
836             &mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index)
837         };
838         // Clear any state.
839         *mut_ref = io_uring_sqe::default();
840         Some(mut_ref)
841     }
842 }
843 
844 struct SubmitQueueState {
845     _mmap: MemoryMapping,
846     pointers: QueuePointers,
847     ring_mask: u32,
848     array: AtomicPtr<u32>,
849 }
850 
851 impl SubmitQueueState {
852     // # Safety
853     // Safe iff `mmap` is created by mapping from a uring FD at the SQ_RING offset and params is
854     // the params struct passed to io_uring_setup.
new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState855     unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState {
856         let ptr = mmap.as_ptr();
857         // Transmutes are safe because a u32 is atomic on all supported architectures and the
858         // pointer will live until after self is dropped because the mmap is owned.
859         let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32;
860         let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32;
861         // This offset is guaranteed to be within the mmap so unwrap the result.
862         let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap();
863         let array = AtomicPtr::new(ptr.add(params.sq_off.array as usize) as *mut u32);
864         SubmitQueueState {
865             _mmap: mmap,
866             pointers: QueuePointers { head, tail },
867             ring_mask,
868             array,
869         }
870     }
871 
872     // Sets the kernel's array entry at the given `index` to `value`.
set_array_entry(&self, index: usize, value: u32)873     fn set_array_entry(&self, index: usize, value: u32) {
874         // Safe because self being constructed from the correct mmap guaratees that the memory is
875         // valid to written.
876         unsafe {
877             std::ptr::write_volatile(self.array.load(Ordering::Relaxed).add(index), value as u32);
878         }
879     }
880 }
881 
882 #[derive(Default)]
883 struct CompleteQueueData {
884     completed: usize,
885     //For ops that pass in arrays of iovecs, they need to be valid for the duration of the
886     //operation because the kernel might read them at any time.
887     pending_op_addrs: BTreeMap<UserData, Pin<Box<[IoBufMut<'static>]>>>,
888 }
889 
890 pub struct CompleteQueueState {
891     mmap: MemoryMapping,
892     pointers: QueuePointers,
893     ring_mask: u32,
894     cqes_offset: u32,
895     data: Mutex<CompleteQueueData>,
896 }
897 
898 impl CompleteQueueState {
899     /// # Safety
900     /// Safe iff `mmap` is created by mapping from a uring FD at the CQ_RING offset and params is
901     /// the params struct passed to io_uring_setup.
new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState902     unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState {
903         let ptr = mmap.as_ptr();
904         let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32;
905         let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32;
906         let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap();
907         CompleteQueueState {
908             mmap,
909             pointers: QueuePointers { head, tail },
910             ring_mask,
911             cqes_offset: params.cq_off.cqes,
912             data: Default::default(),
913         }
914     }
915 
add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>)916     fn add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>) {
917         self.data.lock().pending_op_addrs.insert(user_data, addrs);
918     }
919 
get_cqe(&self, head: u32) -> &io_uring_cqe920     fn get_cqe(&self, head: u32) -> &io_uring_cqe {
921         unsafe {
922             // Safe because we trust that the kernel has returned enough memory in io_uring_setup
923             // and mmap and index is checked within range by the ring_mask.
924             let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize)
925                 as *const io_uring_cqe;
926 
927             let index = head & self.ring_mask;
928 
929             &*cqes.add(index as usize)
930         }
931     }
932 
num_ready(&self) -> u32933     pub fn num_ready(&self) -> u32 {
934         let tail = self.pointers.tail(Ordering::Acquire);
935         let head = self.pointers.head(Ordering::Relaxed);
936 
937         tail.saturating_sub(head)
938     }
939 
num_completed(&self) -> usize940     fn num_completed(&self) -> usize {
941         let mut data = self.data.lock();
942         ::std::mem::replace(&mut data.completed, 0)
943     }
944 
pop_front(&self) -> Option<(UserData, std::io::Result<u32>)>945     fn pop_front(&self) -> Option<(UserData, std::io::Result<u32>)> {
946         // Take the lock on self.data first so that 2 threads don't try to pop the same completed op
947         // from the queue.
948         let mut data = self.data.lock();
949 
950         // Safe because the pointers to the atomics are valid and the cqe must be in range
951         // because the kernel provided mask is applied to the index.
952         let head = self.pointers.head(Ordering::Relaxed);
953 
954         // Synchronize the read of tail after the read of head.
955         if head == self.pointers.tail(Ordering::Acquire) {
956             return None;
957         }
958 
959         data.completed += 1;
960 
961         let cqe = self.get_cqe(head);
962         let user_data = cqe.user_data;
963         let res = cqe.res;
964 
965         // free the addrs saved for this op.
966         let _ = data.pending_op_addrs.remove(&user_data);
967 
968         // Store the new head and ensure the reads above complete before the kernel sees the
969         // update to head, `set_head` uses `Release` ordering
970         let new_head = head.wrapping_add(1);
971         self.pointers.set_head(new_head);
972 
973         let io_res = match res {
974             r if r < 0 => Err(std::io::Error::from_raw_os_error(-r)),
975             r => Ok(r as u32),
976         };
977         Some((user_data, io_res))
978     }
979 }
980 
981 // Return the completed ops with their result.
982 impl<'c> Iterator for &'c CompleteQueueState {
983     type Item = (UserData, std::io::Result<u32>);
984 
next(&mut self) -> Option<Self::Item>985     fn next(&mut self) -> Option<Self::Item> {
986         self.pop_front()
987     }
988 }
989 
990 struct QueuePointers {
991     head: *const AtomicU32,
992     tail: *const AtomicU32,
993 }
994 
995 // Rust pointers don't implement Send or Sync but in this case both fields are atomics and so it's
996 // safe to send the pointers between threads or access them concurrently from multiple threads.
997 unsafe impl Send for QueuePointers {}
998 unsafe impl Sync for QueuePointers {}
999 
1000 impl QueuePointers {
1001     // Loads the tail pointer atomically with the given ordering.
tail(&self, ordering: Ordering) -> u321002     fn tail(&self, ordering: Ordering) -> u32 {
1003         // Safe because self being constructed from the correct mmap guaratees that the memory is
1004         // valid to read.
1005         unsafe { (*self.tail).load(ordering) }
1006     }
1007 
1008     // Stores the new value of the tail in the submit queue. This allows the kernel to start
1009     // processing entries that have been added up until the given tail pointer.
1010     // Always stores with release ordering as that is the only valid way to use the pointer.
set_tail(&self, next_tail: u32)1011     fn set_tail(&self, next_tail: u32) {
1012         // Safe because self being constructed from the correct mmap guaratees that the memory is
1013         // valid to read and it's used as an atomic to cover mutability concerns.
1014         unsafe { (*self.tail).store(next_tail, Ordering::Release) }
1015     }
1016 
1017     // Loads the head pointer atomically with the given ordering.
head(&self, ordering: Ordering) -> u321018     fn head(&self, ordering: Ordering) -> u32 {
1019         // Safe because self being constructed from the correct mmap guaratees that the memory is
1020         // valid to read.
1021         unsafe { (*self.head).load(ordering) }
1022     }
1023 
1024     // Stores the new value of the head in the submit queue. This allows the kernel to start
1025     // processing entries that have been added up until the given head pointer.
1026     // Always stores with release ordering as that is the only valid way to use the pointer.
set_head(&self, next_head: u32)1027     fn set_head(&self, next_head: u32) {
1028         // Safe because self being constructed from the correct mmap guaratees that the memory is
1029         // valid to read and it's used as an atomic to cover mutability concerns.
1030         unsafe { (*self.head).store(next_head, Ordering::Release) }
1031     }
1032 }
1033