1 // Copyright 2020 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 // This file makes several casts from u8 pointers into more-aligned pointer types. 6 // We assume that the kernel will give us suitably aligned memory. 7 #![allow(clippy::cast_ptr_alignment)] 8 9 use std::collections::BTreeMap; 10 use std::fmt; 11 use std::fs::File; 12 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; 13 use std::pin::Pin; 14 use std::sync::atomic::{AtomicPtr, AtomicU32, AtomicU64, AtomicUsize, Ordering}; 15 16 use data_model::IoBufMut; 17 use sync::Mutex; 18 use sys_util::{MappedRegion, MemoryMapping, Protection, WatchingEvents}; 19 20 use crate::bindings::*; 21 use crate::syscalls::*; 22 23 /// Holds per-operation, user specified data. The usage is up to the caller. The most common use is 24 /// for callers to identify each request. 25 pub type UserData = u64; 26 27 #[derive(Debug)] 28 pub enum Error { 29 /// The call to `io_uring_enter` failed with the given errno. 30 RingEnter(libc::c_int), 31 /// The call to `io_uring_setup` failed with the given errno. 32 Setup(libc::c_int), 33 /// Failed to map the completion ring. 34 MappingCompleteRing(sys_util::MmapError), 35 /// Failed to map the submit ring. 36 MappingSubmitRing(sys_util::MmapError), 37 /// Failed to map submit entries. 38 MappingSubmitEntries(sys_util::MmapError), 39 /// Too many ops are already queued. 40 NoSpace, 41 } 42 pub type Result<T> = std::result::Result<T, Error>; 43 44 impl fmt::Display for Error { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result45 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 46 use self::Error::*; 47 48 match self { 49 RingEnter(e) => write!(f, "Failed to enter io uring {}", e), 50 Setup(e) => write!(f, "Failed to setup io uring {}", e), 51 MappingCompleteRing(e) => write!(f, "Failed to mmap completion ring {}", e), 52 MappingSubmitRing(e) => write!(f, "Failed to mmap submit ring {}", e), 53 MappingSubmitEntries(e) => write!(f, "Failed to mmap submit entries {}", e), 54 NoSpace => write!( 55 f, 56 "No space for more ring entries, try increasing the size passed to `new`", 57 ), 58 } 59 } 60 } 61 62 /// Basic statistics about the operations that have been submitted to the uring. 63 #[derive(Default)] 64 pub struct URingStats { 65 total_enter_calls: AtomicU64, // Number of times the uring has been entered. 66 total_ops: AtomicU64, // Total ops submitted to io_uring. 67 total_complete: AtomicU64, // Total ops completed by io_uring. 68 } 69 70 struct SubmitQueue { 71 submit_ring: SubmitQueueState, 72 submit_queue_entries: SubmitQueueEntries, 73 io_vecs: Pin<Box<[IoBufMut<'static>]>>, 74 submitting: usize, // The number of ops in the process of being submitted. 75 added: usize, // The number of ops added since the last call to `io_uring_enter`. 76 num_sqes: usize, // The total number of sqes allocated in shared memory. 77 } 78 79 impl SubmitQueue { 80 // Call `f` with the next available sqe or return an error if none are available. 81 // After `f` returns, the sqe is appended to the kernel's queue. prep_next_sqe<F>(&mut self, mut f: F) -> Result<()> where F: FnMut(&mut io_uring_sqe, &mut libc::iovec),82 fn prep_next_sqe<F>(&mut self, mut f: F) -> Result<()> 83 where 84 F: FnMut(&mut io_uring_sqe, &mut libc::iovec), 85 { 86 if self.added == self.num_sqes { 87 return Err(Error::NoSpace); 88 } 89 90 // Find the next free submission entry in the submit ring and fill it with an iovec. 91 // The below raw pointer derefs are safe because the memory the pointers use lives as long 92 // as the mmap in self. 93 let tail = self.submit_ring.pointers.tail(Ordering::Relaxed); 94 let next_tail = tail.wrapping_add(1); 95 if next_tail == self.submit_ring.pointers.head(Ordering::Acquire) { 96 return Err(Error::NoSpace); 97 } 98 // `tail` is the next sqe to use. 99 let index = (tail & self.submit_ring.ring_mask) as usize; 100 let sqe = self.submit_queue_entries.get_mut(index).unwrap(); 101 102 f(sqe, self.io_vecs[index].as_mut()); 103 104 // Tells the kernel to use the new index when processing the entry at that index. 105 self.submit_ring.set_array_entry(index, index as u32); 106 // Ensure the above writes to sqe are seen before the tail is updated. 107 // set_tail uses Release ordering when storing to the ring. 108 self.submit_ring.pointers.set_tail(next_tail); 109 110 self.added += 1; 111 112 Ok(()) 113 } 114 115 // Returns the number of entries that have been added to this SubmitQueue since the last time 116 // `prepare_submit` was called. prepare_submit(&mut self) -> usize117 fn prepare_submit(&mut self) -> usize { 118 let out = self.added - self.submitting; 119 self.submitting = self.added; 120 121 out 122 } 123 124 // Indicates that we failed to submit `count` entries to the kernel and that they should be 125 // retried. fail_submit(&mut self, count: usize)126 fn fail_submit(&mut self, count: usize) { 127 debug_assert!(count <= self.submitting); 128 self.submitting -= count; 129 } 130 131 // Indicates that `count` entries have been submitted to the kernel and so the space may be 132 // reused for new entries. complete_submit(&mut self, count: usize)133 fn complete_submit(&mut self, count: usize) { 134 debug_assert!(count <= self.submitting); 135 self.submitting -= count; 136 self.added -= count; 137 } 138 add_rw_op( &mut self, ptr: *const u8, len: usize, fd: RawFd, offset: u64, user_data: UserData, op: u8, ) -> Result<()>139 unsafe fn add_rw_op( 140 &mut self, 141 ptr: *const u8, 142 len: usize, 143 fd: RawFd, 144 offset: u64, 145 user_data: UserData, 146 op: u8, 147 ) -> Result<()> { 148 self.prep_next_sqe(|sqe, iovec| { 149 iovec.iov_base = ptr as *const libc::c_void as *mut _; 150 iovec.iov_len = len; 151 sqe.opcode = op; 152 sqe.addr = iovec as *const _ as *const libc::c_void as u64; 153 sqe.len = 1; 154 sqe.__bindgen_anon_1.off = offset; 155 sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0; 156 sqe.ioprio = 0; 157 sqe.user_data = user_data; 158 sqe.flags = 0; 159 sqe.fd = fd; 160 })?; 161 162 Ok(()) 163 } 164 } 165 166 /// Unsafe wrapper for the kernel's io_uring interface. Allows for queueing multiple I/O operations 167 /// to the kernel and asynchronously handling the completion of these operations. 168 /// Use the various `add_*` functions to configure operations, then call `wait` to start 169 /// the operations and get any completed results. Each op is given a u64 user_data argument that is 170 /// used to identify the result when returned in the iterator provided by `wait`. 171 /// 172 /// # Example polling an FD for readable status. 173 /// 174 /// ``` 175 /// # use std::fs::File; 176 /// # use std::os::unix::io::AsRawFd; 177 /// # use std::path::Path; 178 /// # use sys_util::WatchingEvents; 179 /// # use io_uring::URingContext; 180 /// let f = File::open(Path::new("/dev/zero")).unwrap(); 181 /// let uring = URingContext::new(16).unwrap(); 182 /// uring 183 /// .add_poll_fd(f.as_raw_fd(), &WatchingEvents::empty().set_read(), 454) 184 /// .unwrap(); 185 /// let (user_data, res) = uring.wait().unwrap().next().unwrap(); 186 /// assert_eq!(user_data, 454 as io_uring::UserData); 187 /// assert_eq!(res.unwrap(), 1 as u32); 188 /// 189 /// ``` 190 pub struct URingContext { 191 ring_file: File, // Holds the io_uring context FD returned from io_uring_setup. 192 submit_ring: Mutex<SubmitQueue>, 193 complete_ring: CompleteQueueState, 194 in_flight: AtomicUsize, // The number of pending operations. 195 stats: URingStats, 196 } 197 198 impl URingContext { 199 /// Creates a `URingContext` where the underlying uring has a space for `num_entries` 200 /// simultaneous operations. new(num_entries: usize) -> Result<URingContext>201 pub fn new(num_entries: usize) -> Result<URingContext> { 202 let ring_params = io_uring_params::default(); 203 // The below unsafe block isolates the creation of the URingContext. Each step on it's own 204 // is unsafe. Using the uring FD for the mapping and the offsets returned by the kernel for 205 // base addresses maintains safety guarantees assuming the kernel API guarantees are 206 // trusted. 207 unsafe { 208 // Safe because the kernel is trusted to only modify params and `File` is created with 209 // an FD that it takes complete ownership of. 210 let fd = io_uring_setup(num_entries, &ring_params).map_err(Error::Setup)?; 211 let ring_file = File::from_raw_fd(fd); 212 213 // Mmap the submit and completion queues. 214 // Safe because we trust the kernel to set valid sizes in `io_uring_setup` and any error 215 // is checked. 216 let submit_ring = SubmitQueueState::new( 217 MemoryMapping::from_fd_offset_protection_populate( 218 &ring_file, 219 ring_params.sq_off.array as usize 220 + ring_params.sq_entries as usize * std::mem::size_of::<u32>(), 221 u64::from(IORING_OFF_SQ_RING), 222 Protection::read_write(), 223 true, 224 ) 225 .map_err(Error::MappingSubmitRing)?, 226 &ring_params, 227 ); 228 229 let num_sqe = ring_params.sq_entries as usize; 230 let submit_queue_entries = SubmitQueueEntries { 231 mmap: MemoryMapping::from_fd_offset_protection_populate( 232 &ring_file, 233 ring_params.sq_entries as usize * std::mem::size_of::<io_uring_sqe>(), 234 u64::from(IORING_OFF_SQES), 235 Protection::read_write(), 236 true, 237 ) 238 .map_err(Error::MappingSubmitEntries)?, 239 len: num_sqe, 240 }; 241 242 let complete_ring = CompleteQueueState::new( 243 MemoryMapping::from_fd_offset_protection_populate( 244 &ring_file, 245 ring_params.cq_off.cqes as usize 246 + ring_params.cq_entries as usize * std::mem::size_of::<io_uring_cqe>(), 247 u64::from(IORING_OFF_CQ_RING), 248 Protection::read_write(), 249 true, 250 ) 251 .map_err(Error::MappingCompleteRing)?, 252 &ring_params, 253 ); 254 255 Ok(URingContext { 256 ring_file, 257 submit_ring: Mutex::new(SubmitQueue { 258 submit_ring, 259 submit_queue_entries, 260 io_vecs: Pin::from(vec![IoBufMut::new(&mut []); num_sqe].into_boxed_slice()), 261 submitting: 0, 262 added: 0, 263 num_sqes: ring_params.sq_entries as usize, 264 }), 265 complete_ring, 266 in_flight: AtomicUsize::new(0), 267 stats: Default::default(), 268 }) 269 } 270 } 271 272 /// Asynchronously writes to `fd` from the address given in `ptr`. 273 /// # Safety 274 /// `add_write` will write up to `len` bytes of data from the address given by `ptr`. This is 275 /// only safe if the caller guarantees that the memory lives until the transaction is complete 276 /// and that completion has been returned from the `wait` function. In addition there must not 277 /// be other references to the data pointed to by `ptr` until the operation completes. Ensure 278 /// that the fd remains open until the op completes as well. add_write( &self, ptr: *const u8, len: usize, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()>279 pub unsafe fn add_write( 280 &self, 281 ptr: *const u8, 282 len: usize, 283 fd: RawFd, 284 offset: u64, 285 user_data: UserData, 286 ) -> Result<()> { 287 self.submit_ring 288 .lock() 289 .add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_WRITEV as u8) 290 } 291 292 /// Asynchronously reads from `fd` to the address given in `ptr`. 293 /// # Safety 294 /// `add_read` will write up to `len` bytes of data to the address given by `ptr`. This is only 295 /// safe if the caller guarantees there are no other references to that memory and that the 296 /// memory lives until the transaction is complete and that completion has been returned from 297 /// the `wait` function. In addition there must not be any mutable references to the data 298 /// pointed to by `ptr` until the operation completes. Ensure that the fd remains open until 299 /// the op completes as well. add_read( &self, ptr: *mut u8, len: usize, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()>300 pub unsafe fn add_read( 301 &self, 302 ptr: *mut u8, 303 len: usize, 304 fd: RawFd, 305 offset: u64, 306 user_data: UserData, 307 ) -> Result<()> { 308 self.submit_ring 309 .lock() 310 .add_rw_op(ptr, len, fd, offset, user_data, IORING_OP_READV as u8) 311 } 312 313 /// See 'writev' but accepts an iterator instead of a vector if there isn't already a vector in 314 /// existence. add_writev_iter<I>( &self, iovecs: I, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()> where I: Iterator<Item = libc::iovec>,315 pub unsafe fn add_writev_iter<I>( 316 &self, 317 iovecs: I, 318 fd: RawFd, 319 offset: u64, 320 user_data: UserData, 321 ) -> Result<()> 322 where 323 I: Iterator<Item = libc::iovec>, 324 { 325 self.add_writev( 326 Pin::from( 327 // Safe because the caller is required to guarantee that the memory pointed to by 328 // `iovecs` lives until the transaction is complete and the completion has been 329 // returned from `wait()`. 330 iovecs 331 .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len)) 332 .collect::<Vec<_>>() 333 .into_boxed_slice(), 334 ), 335 fd, 336 offset, 337 user_data, 338 ) 339 } 340 341 /// Asynchronously writes to `fd` from the addresses given in `iovecs`. 342 /// # Safety 343 /// `add_writev` will write to the address given by `iovecs`. This is only safe if the caller 344 /// guarantees there are no other references to that memory and that the memory lives until the 345 /// transaction is complete and that completion has been returned from the `wait` function. In 346 /// addition there must not be any mutable references to the data pointed to by `iovecs` until 347 /// the operation completes. Ensure that the fd remains open until the op completes as well. 348 /// The iovecs reference must be kept alive until the op returns. add_writev( &self, iovecs: Pin<Box<[IoBufMut<'static>]>>, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()>349 pub unsafe fn add_writev( 350 &self, 351 iovecs: Pin<Box<[IoBufMut<'static>]>>, 352 fd: RawFd, 353 offset: u64, 354 user_data: UserData, 355 ) -> Result<()> { 356 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| { 357 sqe.opcode = IORING_OP_WRITEV as u8; 358 sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64; 359 sqe.len = iovecs.len() as u32; 360 sqe.__bindgen_anon_1.off = offset; 361 sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0; 362 sqe.ioprio = 0; 363 sqe.user_data = user_data; 364 sqe.flags = 0; 365 sqe.fd = fd; 366 })?; 367 self.complete_ring.add_op_data(user_data, iovecs); 368 Ok(()) 369 } 370 371 /// See 'readv' but accepts an iterator instead of a vector if there isn't already a vector in 372 /// existence. add_readv_iter<I>( &self, iovecs: I, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()> where I: Iterator<Item = libc::iovec>,373 pub unsafe fn add_readv_iter<I>( 374 &self, 375 iovecs: I, 376 fd: RawFd, 377 offset: u64, 378 user_data: UserData, 379 ) -> Result<()> 380 where 381 I: Iterator<Item = libc::iovec>, 382 { 383 self.add_readv( 384 Pin::from( 385 // Safe because the caller is required to guarantee that the memory pointed to by 386 // `iovecs` lives until the transaction is complete and the completion has been 387 // returned from `wait()`. 388 iovecs 389 .map(|iov| IoBufMut::from_raw_parts(iov.iov_base as *mut u8, iov.iov_len)) 390 .collect::<Vec<_>>() 391 .into_boxed_slice(), 392 ), 393 fd, 394 offset, 395 user_data, 396 ) 397 } 398 399 /// Asynchronously reads from `fd` to the addresses given in `iovecs`. 400 /// # Safety 401 /// `add_readv` will write to the address given by `iovecs`. This is only safe if the caller 402 /// guarantees there are no other references to that memory and that the memory lives until the 403 /// transaction is complete and that completion has been returned from the `wait` function. In 404 /// addition there must not be any references to the data pointed to by `iovecs` until the 405 /// operation completes. Ensure that the fd remains open until the op completes as well. 406 /// The iovecs reference must be kept alive until the op returns. add_readv( &self, iovecs: Pin<Box<[IoBufMut<'static>]>>, fd: RawFd, offset: u64, user_data: UserData, ) -> Result<()>407 pub unsafe fn add_readv( 408 &self, 409 iovecs: Pin<Box<[IoBufMut<'static>]>>, 410 fd: RawFd, 411 offset: u64, 412 user_data: UserData, 413 ) -> Result<()> { 414 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| { 415 sqe.opcode = IORING_OP_READV as u8; 416 sqe.addr = iovecs.as_ptr() as *const _ as *const libc::c_void as u64; 417 sqe.len = iovecs.len() as u32; 418 sqe.__bindgen_anon_1.off = offset; 419 sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0; 420 sqe.ioprio = 0; 421 sqe.user_data = user_data; 422 sqe.flags = 0; 423 sqe.fd = fd; 424 })?; 425 self.complete_ring.add_op_data(user_data, iovecs); 426 Ok(()) 427 } 428 429 /// Add a no-op operation that doesn't perform any IO. Useful for testing the performance of the 430 /// io_uring itself and for waking up a thread that's blocked inside a wait() call. add_nop(&self, user_data: UserData) -> Result<()>431 pub fn add_nop(&self, user_data: UserData) -> Result<()> { 432 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| { 433 sqe.opcode = IORING_OP_NOP as u8; 434 sqe.fd = -1; 435 sqe.user_data = user_data; 436 437 sqe.addr = 0; 438 sqe.len = 0; 439 sqe.__bindgen_anon_1.off = 0; 440 sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0; 441 sqe.__bindgen_anon_2.rw_flags = 0; 442 sqe.ioprio = 0; 443 sqe.flags = 0; 444 }) 445 } 446 447 /// Syncs all completed operations, the ordering with in-flight async ops is not 448 /// defined. add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()>449 pub fn add_fsync(&self, fd: RawFd, user_data: UserData) -> Result<()> { 450 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| { 451 sqe.opcode = IORING_OP_FSYNC as u8; 452 sqe.fd = fd; 453 sqe.user_data = user_data; 454 455 sqe.addr = 0; 456 sqe.len = 0; 457 sqe.__bindgen_anon_1.off = 0; 458 sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0; 459 sqe.__bindgen_anon_2.rw_flags = 0; 460 sqe.ioprio = 0; 461 sqe.flags = 0; 462 }) 463 } 464 465 /// See the usage of `fallocate`, this asynchronously performs the same operations. add_fallocate( &self, fd: RawFd, offset: u64, len: u64, mode: u32, user_data: UserData, ) -> Result<()>466 pub fn add_fallocate( 467 &self, 468 fd: RawFd, 469 offset: u64, 470 len: u64, 471 mode: u32, 472 user_data: UserData, 473 ) -> Result<()> { 474 // Note that len for fallocate in passed in the addr field of the sqe and the mode uses the 475 // len field. 476 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| { 477 sqe.opcode = IORING_OP_FALLOCATE as u8; 478 479 sqe.fd = fd; 480 sqe.addr = len; 481 sqe.len = mode; 482 sqe.__bindgen_anon_1.off = offset; 483 sqe.user_data = user_data; 484 485 sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0; 486 sqe.__bindgen_anon_2.rw_flags = 0; 487 sqe.ioprio = 0; 488 sqe.flags = 0; 489 }) 490 } 491 492 /// Adds an FD to be polled based on the given flags. 493 /// The user must keep the FD open until the operation completion is returned from 494 /// `wait`. 495 /// Note that io_uring is always a one shot poll. After the fd is returned, it must be re-added 496 /// to get future events. add_poll_fd( &self, fd: RawFd, events: &WatchingEvents, user_data: UserData, ) -> Result<()>497 pub fn add_poll_fd( 498 &self, 499 fd: RawFd, 500 events: &WatchingEvents, 501 user_data: UserData, 502 ) -> Result<()> { 503 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| { 504 sqe.opcode = IORING_OP_POLL_ADD as u8; 505 sqe.fd = fd; 506 sqe.user_data = user_data; 507 sqe.__bindgen_anon_2.poll_events = events.get_raw() as u16; 508 509 sqe.addr = 0; 510 sqe.len = 0; 511 sqe.__bindgen_anon_1.off = 0; 512 sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0; 513 sqe.ioprio = 0; 514 sqe.flags = 0; 515 }) 516 } 517 518 /// Removes an FD that was previously added with `add_poll_fd`. remove_poll_fd( &self, fd: RawFd, events: &WatchingEvents, user_data: UserData, ) -> Result<()>519 pub fn remove_poll_fd( 520 &self, 521 fd: RawFd, 522 events: &WatchingEvents, 523 user_data: UserData, 524 ) -> Result<()> { 525 self.submit_ring.lock().prep_next_sqe(|sqe, _iovec| { 526 sqe.opcode = IORING_OP_POLL_REMOVE as u8; 527 sqe.fd = fd; 528 sqe.user_data = user_data; 529 sqe.__bindgen_anon_2.poll_events = events.get_raw() as u16; 530 531 sqe.addr = 0; 532 sqe.len = 0; 533 sqe.__bindgen_anon_1.off = 0; 534 sqe.__bindgen_anon_3.__bindgen_anon_1.buf_index = 0; 535 sqe.ioprio = 0; 536 sqe.flags = 0; 537 }) 538 } 539 540 // Calls io_uring_enter, submitting any new sqes that have been added to the submit queue and 541 // waiting for `wait_nr` operations to complete. enter(&self, wait_nr: u64) -> Result<()>542 fn enter(&self, wait_nr: u64) -> Result<()> { 543 let completed = self.complete_ring.num_completed(); 544 self.stats 545 .total_complete 546 .fetch_add(completed as u64, Ordering::Relaxed); 547 self.in_flight.fetch_sub(completed, Ordering::Relaxed); 548 549 let added = self.submit_ring.lock().prepare_submit(); 550 if added == 0 && wait_nr == 0 { 551 return Ok(()); 552 } 553 554 self.stats.total_enter_calls.fetch_add(1, Ordering::Relaxed); 555 let flags = if wait_nr > 0 { 556 IORING_ENTER_GETEVENTS 557 } else { 558 0 559 }; 560 let res = unsafe { 561 // Safe because the only memory modified is in the completion queue. 562 io_uring_enter(self.ring_file.as_raw_fd(), added as u64, wait_nr, flags) 563 }; 564 565 match res { 566 Ok(_) => { 567 self.submit_ring.lock().complete_submit(added); 568 self.stats 569 .total_ops 570 .fetch_add(added as u64, Ordering::Relaxed); 571 572 // Release store synchronizes with acquire load above. 573 self.in_flight.fetch_add(added, Ordering::Release); 574 } 575 Err(e) => { 576 self.submit_ring.lock().fail_submit(added); 577 578 if wait_nr == 0 || e != libc::EBUSY { 579 return Err(Error::RingEnter(e)); 580 } 581 582 // An ebusy return means that some completed events must be processed before 583 // submitting more, wait for some to finish without pushing the new sqes in 584 // that case. 585 unsafe { 586 io_uring_enter(self.ring_file.as_raw_fd(), 0, wait_nr, flags) 587 .map_err(Error::RingEnter)?; 588 } 589 } 590 } 591 592 Ok(()) 593 } 594 595 /// Sends operations added with the `add_*` functions to the kernel. submit(&self) -> Result<()>596 pub fn submit(&self) -> Result<()> { 597 self.enter(0) 598 } 599 600 /// Sends operations added with the `add_*` functions to the kernel and return an iterator to any 601 /// completed operations. `wait` blocks until at least one completion is ready. If called 602 /// without any new events added, this simply waits for any existing events to complete and 603 /// returns as soon an one or more is ready. wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_>604 pub fn wait(&self) -> Result<impl Iterator<Item = (UserData, std::io::Result<u32>)> + '_> { 605 // We only want to wait for events if there aren't already events in the completion queue. 606 let wait_nr = if self.complete_ring.num_ready() > 0 { 607 0 608 } else { 609 1 610 }; 611 612 // The CompletionQueue will iterate all completed ops. 613 match self.enter(wait_nr) { 614 Ok(()) => Ok(&self.complete_ring), 615 // If we cannot submit any more entries then we need to pull stuff out of the completion 616 // ring, so just return the completion ring. This can only happen when `wait_nr` is 0 so 617 // we know there are already entries in the completion queue. 618 Err(Error::RingEnter(libc::EBUSY)) => Ok(&self.complete_ring), 619 Err(e) => Err(e), 620 } 621 } 622 } 623 624 impl AsRawFd for URingContext { as_raw_fd(&self) -> RawFd625 fn as_raw_fd(&self) -> RawFd { 626 self.ring_file.as_raw_fd() 627 } 628 } 629 630 struct SubmitQueueEntries { 631 mmap: MemoryMapping, 632 len: usize, 633 } 634 635 impl SubmitQueueEntries { get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe>636 fn get_mut(&mut self, index: usize) -> Option<&mut io_uring_sqe> { 637 if index >= self.len { 638 return None; 639 } 640 let mut_ref = unsafe { 641 // Safe because the mut borrow of self resticts to one mutable reference at a time and 642 // we trust that the kernel has returned enough memory in io_uring_setup and mmap. 643 &mut *(self.mmap.as_ptr() as *mut io_uring_sqe).add(index) 644 }; 645 // Clear any state. 646 *mut_ref = io_uring_sqe::default(); 647 Some(mut_ref) 648 } 649 } 650 651 struct SubmitQueueState { 652 _mmap: MemoryMapping, 653 pointers: QueuePointers, 654 ring_mask: u32, 655 array: AtomicPtr<u32>, 656 } 657 658 impl SubmitQueueState { 659 // # Safety 660 // Safe iff `mmap` is created by mapping from a uring FD at the SQ_RING offset and params is 661 // the params struct passed to io_uring_setup. new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState662 unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> SubmitQueueState { 663 let ptr = mmap.as_ptr(); 664 // Transmutes are safe because a u32 is atomic on all supported architectures and the 665 // pointer will live until after self is dropped because the mmap is owned. 666 let head = ptr.add(params.sq_off.head as usize) as *const AtomicU32; 667 let tail = ptr.add(params.sq_off.tail as usize) as *const AtomicU32; 668 // This offset is guaranteed to be within the mmap so unwrap the result. 669 let ring_mask = mmap.read_obj(params.sq_off.ring_mask as usize).unwrap(); 670 let array = AtomicPtr::new(ptr.add(params.sq_off.array as usize) as *mut u32); 671 SubmitQueueState { 672 _mmap: mmap, 673 pointers: QueuePointers { head, tail }, 674 ring_mask, 675 array, 676 } 677 } 678 679 // Sets the kernel's array entry at the given `index` to `value`. set_array_entry(&self, index: usize, value: u32)680 fn set_array_entry(&self, index: usize, value: u32) { 681 // Safe because self being constructed from the correct mmap guaratees that the memory is 682 // valid to written. 683 unsafe { 684 std::ptr::write_volatile(self.array.load(Ordering::Relaxed).add(index), value as u32); 685 } 686 } 687 } 688 689 #[derive(Default)] 690 struct CompleteQueueData { 691 completed: usize, 692 //For ops that pass in arrays of iovecs, they need to be valid for the duration of the 693 //operation because the kernel might read them at any time. 694 pending_op_addrs: BTreeMap<UserData, Pin<Box<[IoBufMut<'static>]>>>, 695 } 696 697 struct CompleteQueueState { 698 mmap: MemoryMapping, 699 pointers: QueuePointers, 700 ring_mask: u32, 701 cqes_offset: u32, 702 data: Mutex<CompleteQueueData>, 703 } 704 705 impl CompleteQueueState { 706 /// # Safety 707 /// Safe iff `mmap` is created by mapping from a uring FD at the CQ_RING offset and params is 708 /// the params struct passed to io_uring_setup. new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState709 unsafe fn new(mmap: MemoryMapping, params: &io_uring_params) -> CompleteQueueState { 710 let ptr = mmap.as_ptr(); 711 let head = ptr.add(params.cq_off.head as usize) as *const AtomicU32; 712 let tail = ptr.add(params.cq_off.tail as usize) as *const AtomicU32; 713 let ring_mask = mmap.read_obj(params.cq_off.ring_mask as usize).unwrap(); 714 CompleteQueueState { 715 mmap, 716 pointers: QueuePointers { head, tail }, 717 ring_mask, 718 cqes_offset: params.cq_off.cqes, 719 data: Default::default(), 720 } 721 } 722 add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>)723 fn add_op_data(&self, user_data: UserData, addrs: Pin<Box<[IoBufMut<'static>]>>) { 724 self.data.lock().pending_op_addrs.insert(user_data, addrs); 725 } 726 get_cqe(&self, head: u32) -> &io_uring_cqe727 fn get_cqe(&self, head: u32) -> &io_uring_cqe { 728 unsafe { 729 // Safe because we trust that the kernel has returned enough memory in io_uring_setup 730 // and mmap and index is checked within range by the ring_mask. 731 let cqes = (self.mmap.as_ptr() as *const u8).add(self.cqes_offset as usize) 732 as *const io_uring_cqe; 733 734 let index = head & self.ring_mask; 735 736 &*cqes.add(index as usize) 737 } 738 } 739 num_ready(&self) -> u32740 fn num_ready(&self) -> u32 { 741 let tail = self.pointers.tail(Ordering::Acquire); 742 let head = self.pointers.head(Ordering::Relaxed); 743 744 tail.saturating_sub(head) 745 } 746 num_completed(&self) -> usize747 fn num_completed(&self) -> usize { 748 let mut data = self.data.lock(); 749 ::std::mem::replace(&mut data.completed, 0) 750 } 751 pop_front(&self) -> Option<(UserData, std::io::Result<u32>)>752 fn pop_front(&self) -> Option<(UserData, std::io::Result<u32>)> { 753 // Take the lock on self.data first so that 2 threads don't try to pop the same completed op 754 // from the queue. 755 let mut data = self.data.lock(); 756 757 // Safe because the pointers to the atomics are valid and the cqe must be in range 758 // because the kernel provided mask is applied to the index. 759 let head = self.pointers.head(Ordering::Relaxed); 760 761 // Synchronize the read of tail after the read of head. 762 if head == self.pointers.tail(Ordering::Acquire) { 763 return None; 764 } 765 766 data.completed += 1; 767 768 let cqe = self.get_cqe(head); 769 let user_data = cqe.user_data; 770 let res = cqe.res; 771 772 // free the addrs saved for this op. 773 let _ = data.pending_op_addrs.remove(&user_data); 774 775 // Store the new head and ensure the reads above complete before the kernel sees the 776 // update to head, `set_head` uses `Release` ordering 777 let new_head = head.wrapping_add(1); 778 self.pointers.set_head(new_head); 779 780 let io_res = match res { 781 r if r < 0 => Err(std::io::Error::from_raw_os_error(-r)), 782 r => Ok(r as u32), 783 }; 784 Some((user_data, io_res)) 785 } 786 } 787 788 // Return the completed ops with their result. 789 impl<'c> Iterator for &'c CompleteQueueState { 790 type Item = (UserData, std::io::Result<u32>); 791 next(&mut self) -> Option<Self::Item>792 fn next(&mut self) -> Option<Self::Item> { 793 self.pop_front() 794 } 795 } 796 797 struct QueuePointers { 798 head: *const AtomicU32, 799 tail: *const AtomicU32, 800 } 801 802 // Rust pointers don't implement Send or Sync but in this case both fields are atomics and so it's 803 // safe to send the pointers between threads or access them concurrently from multiple threads. 804 unsafe impl Send for QueuePointers {} 805 unsafe impl Sync for QueuePointers {} 806 807 impl QueuePointers { 808 // Loads the tail pointer atomically with the given ordering. tail(&self, ordering: Ordering) -> u32809 fn tail(&self, ordering: Ordering) -> u32 { 810 // Safe because self being constructed from the correct mmap guaratees that the memory is 811 // valid to read. 812 unsafe { (*self.tail).load(ordering) } 813 } 814 815 // Stores the new value of the tail in the submit queue. This allows the kernel to start 816 // processing entries that have been added up until the given tail pointer. 817 // Always stores with release ordering as that is the only valid way to use the pointer. set_tail(&self, next_tail: u32)818 fn set_tail(&self, next_tail: u32) { 819 // Safe because self being constructed from the correct mmap guaratees that the memory is 820 // valid to read and it's used as an atomic to cover mutability concerns. 821 unsafe { (*self.tail).store(next_tail, Ordering::Release) } 822 } 823 824 // Loads the head pointer atomically with the given ordering. head(&self, ordering: Ordering) -> u32825 fn head(&self, ordering: Ordering) -> u32 { 826 // Safe because self being constructed from the correct mmap guaratees that the memory is 827 // valid to read. 828 unsafe { (*self.head).load(ordering) } 829 } 830 831 // Stores the new value of the head in the submit queue. This allows the kernel to start 832 // processing entries that have been added up until the given head pointer. 833 // Always stores with release ordering as that is the only valid way to use the pointer. set_head(&self, next_head: u32)834 fn set_head(&self, next_head: u32) { 835 // Safe because self being constructed from the correct mmap guaratees that the memory is 836 // valid to read and it's used as an atomic to cover mutability concerns. 837 unsafe { (*self.head).store(next_head, Ordering::Release) } 838 } 839 } 840 841 #[cfg(test)] 842 mod tests { 843 use std::collections::BTreeSet; 844 use std::fs::OpenOptions; 845 use std::io::{IoSlice, IoSliceMut}; 846 use std::io::{Read, Seek, SeekFrom, Write}; 847 use std::mem; 848 use std::path::{Path, PathBuf}; 849 use std::sync::mpsc::channel; 850 use std::sync::{Arc, Barrier}; 851 use std::thread; 852 use std::time::Duration; 853 854 use sync::{Condvar, Mutex}; 855 use sys_util::{pipe, PollContext}; 856 use tempfile::{tempfile, TempDir}; 857 858 use super::*; 859 append_file_name(path: &Path, name: &str) -> PathBuf860 fn append_file_name(path: &Path, name: &str) -> PathBuf { 861 let mut joined = path.to_path_buf(); 862 joined.push(name); 863 joined 864 } 865 check_one_read( uring: &URingContext, buf: &mut [u8], fd: RawFd, offset: u64, user_data: UserData, )866 fn check_one_read( 867 uring: &URingContext, 868 buf: &mut [u8], 869 fd: RawFd, 870 offset: u64, 871 user_data: UserData, 872 ) { 873 let (user_data_ret, res) = unsafe { 874 // Safe because the `wait` call waits until the kernel is done with `buf`. 875 uring 876 .add_read(buf.as_mut_ptr(), buf.len(), fd, offset, user_data) 877 .unwrap(); 878 uring.wait().unwrap().next().unwrap() 879 }; 880 assert_eq!(user_data_ret, user_data); 881 assert_eq!(res.unwrap(), buf.len() as u32); 882 } 883 check_one_readv( uring: &URingContext, buf: &mut [u8], fd: RawFd, offset: u64, user_data: UserData, )884 fn check_one_readv( 885 uring: &URingContext, 886 buf: &mut [u8], 887 fd: RawFd, 888 offset: u64, 889 user_data: UserData, 890 ) { 891 let io_vecs = unsafe { 892 //safe to transmut from IoSlice to iovec. 893 vec![IoSliceMut::new(buf)] 894 .into_iter() 895 .map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice)) 896 }; 897 let (user_data_ret, res) = unsafe { 898 // Safe because the `wait` call waits until the kernel is done with `buf`. 899 uring 900 .add_readv_iter(io_vecs, fd, offset, user_data) 901 .unwrap(); 902 uring.wait().unwrap().next().unwrap() 903 }; 904 assert_eq!(user_data_ret, user_data); 905 assert_eq!(res.unwrap(), buf.len() as u32); 906 } 907 create_test_file(size: u64) -> std::fs::File908 fn create_test_file(size: u64) -> std::fs::File { 909 let f = tempfile().unwrap(); 910 f.set_len(size).unwrap(); 911 f 912 } 913 914 #[test] 915 // Queue as many reads as possible and then collect the completions. read_parallel()916 fn read_parallel() { 917 const QUEUE_SIZE: usize = 10; 918 const BUF_SIZE: usize = 0x1000; 919 920 let uring = URingContext::new(QUEUE_SIZE).unwrap(); 921 let mut buf = [0u8; BUF_SIZE * QUEUE_SIZE]; 922 let f = create_test_file((BUF_SIZE * QUEUE_SIZE) as u64); 923 924 // check that the whole file can be read and that the queues wrapping is handled by reading 925 // double the quue depth of buffers. 926 for i in 0..QUEUE_SIZE * 64 { 927 let index = i as u64; 928 unsafe { 929 let offset = (i % QUEUE_SIZE) * BUF_SIZE; 930 match uring.add_read( 931 buf[offset..].as_mut_ptr(), 932 BUF_SIZE, 933 f.as_raw_fd(), 934 offset as u64, 935 index, 936 ) { 937 Ok(_) => (), 938 Err(Error::NoSpace) => { 939 let _ = uring.wait().unwrap().next().unwrap(); 940 } 941 Err(_) => panic!("unexpected error from uring wait"), 942 } 943 } 944 } 945 } 946 947 #[test] read_readv()948 fn read_readv() { 949 let queue_size = 128; 950 951 let uring = URingContext::new(queue_size).unwrap(); 952 let mut buf = [0u8; 0x1000]; 953 let f = create_test_file(0x1000 * 2); 954 955 // check that the whole file can be read and that the queues wrapping is handled by reading 956 // double the quue depth of buffers. 957 for i in 0..queue_size * 2 { 958 let index = i as u64; 959 check_one_read(&uring, &mut buf, f.as_raw_fd(), (index % 2) * 0x1000, index); 960 check_one_readv(&uring, &mut buf, f.as_raw_fd(), (index % 2) * 0x1000, index); 961 } 962 } 963 964 #[test] readv_vec()965 fn readv_vec() { 966 let queue_size = 128; 967 const BUF_SIZE: usize = 0x2000; 968 969 let uring = URingContext::new(queue_size).unwrap(); 970 let mut buf = [0u8; BUF_SIZE]; 971 let mut buf2 = [0u8; BUF_SIZE]; 972 let mut buf3 = [0u8; BUF_SIZE]; 973 let io_vecs = unsafe { 974 //safe to transmut from IoSlice to iovec. 975 vec![ 976 IoSliceMut::new(&mut buf), 977 IoSliceMut::new(&mut buf2), 978 IoSliceMut::new(&mut buf3), 979 ] 980 .into_iter() 981 .map(|slice| std::mem::transmute::<IoSliceMut, libc::iovec>(slice)) 982 .collect::<Vec<libc::iovec>>() 983 }; 984 let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len); 985 let f = create_test_file(total_len as u64 * 2); 986 let (user_data_ret, res) = unsafe { 987 // Safe because the `wait` call waits until the kernel is done with `buf`. 988 uring 989 .add_readv_iter(io_vecs.into_iter(), f.as_raw_fd(), 0, 55) 990 .unwrap(); 991 uring.wait().unwrap().next().unwrap() 992 }; 993 assert_eq!(user_data_ret, 55); 994 assert_eq!(res.unwrap(), total_len as u32); 995 } 996 997 #[test] write_one_block()998 fn write_one_block() { 999 let uring = URingContext::new(16).unwrap(); 1000 let mut buf = [0u8; 4096]; 1001 let mut f = create_test_file(0); 1002 f.write(&buf).unwrap(); 1003 f.write(&buf).unwrap(); 1004 1005 unsafe { 1006 // Safe because the `wait` call waits until the kernel is done mutating `buf`. 1007 uring 1008 .add_write(buf.as_mut_ptr(), buf.len(), f.as_raw_fd(), 0, 55) 1009 .unwrap(); 1010 let (user_data, res) = uring.wait().unwrap().next().unwrap(); 1011 assert_eq!(user_data, 55_u64); 1012 assert_eq!(res.unwrap(), buf.len() as u32); 1013 } 1014 } 1015 1016 #[test] write_one_submit_poll()1017 fn write_one_submit_poll() { 1018 let uring = URingContext::new(16).unwrap(); 1019 let mut buf = [0u8; 4096]; 1020 let mut f = create_test_file(0); 1021 f.write(&buf).unwrap(); 1022 f.write(&buf).unwrap(); 1023 1024 let ctx: PollContext<u64> = PollContext::build_with(&[(&uring, 1)]).unwrap(); 1025 { 1026 // Test that the uring context isn't readable before any events are complete. 1027 let events = ctx.wait_timeout(Duration::from_millis(1)).unwrap(); 1028 assert!(events.iter_readable().next().is_none()); 1029 } 1030 1031 unsafe { 1032 // Safe because the `wait` call waits until the kernel is done mutating `buf`. 1033 uring 1034 .add_write(buf.as_mut_ptr(), buf.len(), f.as_raw_fd(), 0, 55) 1035 .unwrap(); 1036 uring.submit().unwrap(); 1037 // Poll for completion with epoll. 1038 let events = ctx.wait().unwrap(); 1039 let event = events.iter_readable().next().unwrap(); 1040 assert_eq!(event.token(), 1); 1041 let (user_data, res) = uring.wait().unwrap().next().unwrap(); 1042 assert_eq!(user_data, 55_u64); 1043 assert_eq!(res.unwrap(), buf.len() as u32); 1044 } 1045 } 1046 1047 #[test] writev_vec()1048 fn writev_vec() { 1049 let queue_size = 128; 1050 const BUF_SIZE: usize = 0x2000; 1051 const OFFSET: u64 = 0x2000; 1052 1053 let uring = URingContext::new(queue_size).unwrap(); 1054 let buf = [0xaau8; BUF_SIZE]; 1055 let buf2 = [0xffu8; BUF_SIZE]; 1056 let buf3 = [0x55u8; BUF_SIZE]; 1057 let io_vecs = unsafe { 1058 //safe to transmut from IoSlice to iovec. 1059 vec![IoSlice::new(&buf), IoSlice::new(&buf2), IoSlice::new(&buf3)] 1060 .into_iter() 1061 .map(|slice| std::mem::transmute::<IoSlice, libc::iovec>(slice)) 1062 .collect::<Vec<libc::iovec>>() 1063 }; 1064 let total_len = io_vecs.iter().fold(0, |a, iovec| a + iovec.iov_len); 1065 let mut f = create_test_file(total_len as u64 * 2); 1066 let (user_data_ret, res) = unsafe { 1067 // Safe because the `wait` call waits until the kernel is done with `buf`. 1068 uring 1069 .add_writev_iter(io_vecs.into_iter(), f.as_raw_fd(), OFFSET, 55) 1070 .unwrap(); 1071 uring.wait().unwrap().next().unwrap() 1072 }; 1073 assert_eq!(user_data_ret, 55); 1074 assert_eq!(res.unwrap(), total_len as u32); 1075 1076 let mut read_back = [0u8; BUF_SIZE]; 1077 f.seek(SeekFrom::Start(OFFSET)).unwrap(); 1078 f.read(&mut read_back).unwrap(); 1079 assert!(!read_back.iter().any(|&b| b != 0xaa)); 1080 f.read(&mut read_back).unwrap(); 1081 assert!(!read_back.iter().any(|&b| b != 0xff)); 1082 f.read(&mut read_back).unwrap(); 1083 assert!(!read_back.iter().any(|&b| b != 0x55)); 1084 } 1085 1086 #[test] fallocate_fsync()1087 fn fallocate_fsync() { 1088 let tempdir = TempDir::new().unwrap(); 1089 let file_path = append_file_name(tempdir.path(), "test"); 1090 1091 { 1092 let buf = [0u8; 4096]; 1093 let mut f = OpenOptions::new() 1094 .read(true) 1095 .write(true) 1096 .create(true) 1097 .truncate(true) 1098 .open(&file_path) 1099 .unwrap(); 1100 f.write(&buf).unwrap(); 1101 } 1102 1103 let init_size = std::fs::metadata(&file_path).unwrap().len() as usize; 1104 let set_size = init_size + 1024 * 1024 * 50; 1105 let f = OpenOptions::new() 1106 .read(true) 1107 .write(true) 1108 .create(true) 1109 .open(&file_path) 1110 .unwrap(); 1111 1112 let uring = URingContext::new(16).unwrap(); 1113 uring 1114 .add_fallocate(f.as_raw_fd(), 0, set_size as u64, 0, 66) 1115 .unwrap(); 1116 let (user_data, res) = uring.wait().unwrap().next().unwrap(); 1117 assert_eq!(user_data, 66_u64); 1118 match res { 1119 Err(e) => { 1120 if e.kind() == std::io::ErrorKind::InvalidInput { 1121 // skip on kernels that don't support fallocate. 1122 return; 1123 } 1124 panic!("Unexpected fallocate error: {}", e); 1125 } 1126 Ok(val) => assert_eq!(val, 0_u32), 1127 } 1128 1129 // Add a few writes and then fsync 1130 let buf = [0u8; 4096]; 1131 let mut pending = std::collections::BTreeSet::new(); 1132 unsafe { 1133 uring 1134 .add_write(buf.as_ptr(), buf.len(), f.as_raw_fd(), 0, 67) 1135 .unwrap(); 1136 pending.insert(67u64); 1137 uring 1138 .add_write(buf.as_ptr(), buf.len(), f.as_raw_fd(), 4096, 68) 1139 .unwrap(); 1140 pending.insert(68); 1141 uring 1142 .add_write(buf.as_ptr(), buf.len(), f.as_raw_fd(), 8192, 69) 1143 .unwrap(); 1144 pending.insert(69); 1145 } 1146 uring.add_fsync(f.as_raw_fd(), 70).unwrap(); 1147 pending.insert(70); 1148 1149 let mut wait_calls = 0; 1150 1151 while !pending.is_empty() && wait_calls < 5 { 1152 let events = uring.wait().unwrap(); 1153 for (user_data, res) in events { 1154 assert!(res.is_ok()); 1155 assert!(pending.contains(&user_data)); 1156 pending.remove(&user_data); 1157 } 1158 wait_calls += 1; 1159 } 1160 assert!(pending.is_empty()); 1161 1162 uring 1163 .add_fallocate( 1164 f.as_raw_fd(), 1165 init_size as u64, 1166 (set_size - init_size) as u64, 1167 (libc::FALLOC_FL_PUNCH_HOLE | libc::FALLOC_FL_KEEP_SIZE) as u32, 1168 68, 1169 ) 1170 .unwrap(); 1171 let (user_data, res) = uring.wait().unwrap().next().unwrap(); 1172 assert_eq!(user_data, 68_u64); 1173 assert_eq!(res.unwrap(), 0_u32); 1174 1175 drop(f); // Close to ensure directory entires for metadata are updated. 1176 1177 let new_size = std::fs::metadata(&file_path).unwrap().len() as usize; 1178 assert_eq!(new_size, set_size); 1179 } 1180 1181 #[test] dev_zero_readable()1182 fn dev_zero_readable() { 1183 let f = File::open(Path::new("/dev/zero")).unwrap(); 1184 let uring = URingContext::new(16).unwrap(); 1185 uring 1186 .add_poll_fd(f.as_raw_fd(), &WatchingEvents::empty().set_read(), 454) 1187 .unwrap(); 1188 let (user_data, res) = uring.wait().unwrap().next().unwrap(); 1189 assert_eq!(user_data, 454_u64); 1190 assert_eq!(res.unwrap(), 1_u32); 1191 } 1192 1193 #[test] queue_many_ebusy_retry()1194 fn queue_many_ebusy_retry() { 1195 let num_entries = 16; 1196 let f = File::open(Path::new("/dev/zero")).unwrap(); 1197 let uring = URingContext::new(num_entries).unwrap(); 1198 // Fill the sumbit ring. 1199 for sqe_batch in 0..3 { 1200 for i in 0..num_entries { 1201 uring 1202 .add_poll_fd( 1203 f.as_raw_fd(), 1204 &WatchingEvents::empty().set_read(), 1205 (sqe_batch * num_entries + i) as u64, 1206 ) 1207 .unwrap(); 1208 } 1209 uring.submit().unwrap(); 1210 } 1211 // Adding more than the number of cqes will cause the uring to return ebusy, make sure that 1212 // is handled cleanly and wait still returns the completed entries. 1213 uring 1214 .add_poll_fd( 1215 f.as_raw_fd(), 1216 &WatchingEvents::empty().set_read(), 1217 (num_entries * 3) as u64, 1218 ) 1219 .unwrap(); 1220 // The first wait call should return the cques that are already filled. 1221 { 1222 let mut results = uring.wait().unwrap(); 1223 for _i in 0..num_entries * 2 { 1224 assert_eq!(results.next().unwrap().1.unwrap(), 1_u32); 1225 } 1226 assert!(results.next().is_none()); 1227 } 1228 // The second will finish submitting any more sqes and return the rest. 1229 let mut results = uring.wait().unwrap(); 1230 for _i in 0..num_entries + 1 { 1231 assert_eq!(results.next().unwrap().1.unwrap(), 1_u32); 1232 } 1233 assert!(results.next().is_none()); 1234 } 1235 1236 #[test] wake_with_nop()1237 fn wake_with_nop() { 1238 const PIPE_READ: UserData = 0; 1239 const NOP: UserData = 1; 1240 const BUF_DATA: [u8; 16] = [0xf4; 16]; 1241 1242 let uring = URingContext::new(4).map(Arc::new).unwrap(); 1243 let (pipe_out, mut pipe_in) = pipe(true).unwrap(); 1244 let (tx, rx) = channel(); 1245 1246 let uring2 = uring.clone(); 1247 let wait_thread = thread::spawn(move || { 1248 let mut buf = [0u8; BUF_DATA.len()]; 1249 unsafe { 1250 uring2 1251 .add_read(buf.as_mut_ptr(), buf.len(), pipe_out.as_raw_fd(), 0, 0) 1252 .unwrap(); 1253 } 1254 1255 // This is still a bit racy as the other thread may end up adding the NOP before we make 1256 // the syscall but I'm not aware of a mechanism that will notify the other thread 1257 // exactly when we make the syscall. 1258 tx.send(()).unwrap(); 1259 let mut events = uring2.wait().unwrap(); 1260 let (user_data, result) = events.next().unwrap(); 1261 assert_eq!(user_data, NOP); 1262 assert_eq!(result.unwrap(), 0); 1263 1264 tx.send(()).unwrap(); 1265 let mut events = uring2.wait().unwrap(); 1266 let (user_data, result) = events.next().unwrap(); 1267 assert_eq!(user_data, PIPE_READ); 1268 assert_eq!(result.unwrap(), buf.len() as u32); 1269 assert_eq!(&buf, &BUF_DATA); 1270 }); 1271 1272 // Wait until the other thread is about to make the syscall. 1273 rx.recv_timeout(Duration::from_secs(10)).unwrap(); 1274 1275 // Now add a NOP operation. This should wake up the other thread even though it cannot yet 1276 // read from the pipe. 1277 uring.add_nop(NOP).unwrap(); 1278 uring.submit().unwrap(); 1279 1280 // Wait for the other thread to process the NOP result. 1281 rx.recv_timeout(Duration::from_secs(10)).unwrap(); 1282 1283 // Now write to the pipe to finish the uring read. 1284 pipe_in.write_all(&BUF_DATA).unwrap(); 1285 1286 wait_thread.join().unwrap(); 1287 } 1288 1289 #[test] complete_from_any_thread()1290 fn complete_from_any_thread() { 1291 let num_entries = 16; 1292 let uring = URingContext::new(num_entries).map(Arc::new).unwrap(); 1293 1294 // Fill the sumbit ring. 1295 for sqe_batch in 0..3 { 1296 for i in 0..num_entries { 1297 uring.add_nop((sqe_batch * num_entries + i) as u64).unwrap(); 1298 } 1299 uring.submit().unwrap(); 1300 } 1301 1302 // Spawn a bunch of threads that pull cqes out of the uring and make sure none of them see a 1303 // duplicate. 1304 const NUM_THREADS: usize = 7; 1305 let completed = Arc::new(Mutex::new(BTreeSet::new())); 1306 let cv = Arc::new(Condvar::new()); 1307 let barrier = Arc::new(Barrier::new(NUM_THREADS)); 1308 1309 let mut threads = Vec::with_capacity(NUM_THREADS); 1310 for _ in 0..NUM_THREADS { 1311 let uring = uring.clone(); 1312 let completed = completed.clone(); 1313 let barrier = barrier.clone(); 1314 let cv = cv.clone(); 1315 threads.push(thread::spawn(move || { 1316 barrier.wait(); 1317 1318 'wait: while completed.lock().len() < num_entries * 3 { 1319 for (user_data, result) in uring.wait().unwrap() { 1320 assert_eq!(result.unwrap(), 0); 1321 1322 let mut completed = completed.lock(); 1323 assert!(completed.insert(user_data)); 1324 if completed.len() >= num_entries * 3 { 1325 break 'wait; 1326 } 1327 } 1328 } 1329 1330 cv.notify_one(); 1331 })); 1332 } 1333 1334 // Wait until all the operations have completed. 1335 let mut c = completed.lock(); 1336 while c.len() < num_entries * 3 { 1337 c = cv.wait(c); 1338 } 1339 mem::drop(c); 1340 1341 // Let the OS clean up the still-waiting threads after the test run. 1342 } 1343 1344 #[test] submit_from_any_thread()1345 fn submit_from_any_thread() { 1346 const NUM_THREADS: usize = 7; 1347 const ITERATIONS: usize = 113; 1348 const NUM_ENTRIES: usize = 16; 1349 1350 fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) { 1351 let mut in_flight = in_flight.lock(); 1352 while *in_flight > NUM_ENTRIES as isize { 1353 in_flight = cv.wait(in_flight); 1354 } 1355 } 1356 1357 let uring = URingContext::new(NUM_ENTRIES).map(Arc::new).unwrap(); 1358 let in_flight = Arc::new(Mutex::new(0)); 1359 let cv = Arc::new(Condvar::new()); 1360 1361 let mut threads = Vec::with_capacity(NUM_THREADS); 1362 for idx in 0..NUM_THREADS { 1363 let uring = uring.clone(); 1364 let in_flight = in_flight.clone(); 1365 let cv = cv.clone(); 1366 threads.push(thread::spawn(move || { 1367 for iter in 0..ITERATIONS { 1368 loop { 1369 match uring.add_nop(((idx * NUM_THREADS) + iter) as UserData) { 1370 Ok(()) => *in_flight.lock() += 1, 1371 Err(Error::NoSpace) => { 1372 wait_for_completion_thread(&in_flight, &cv); 1373 continue; 1374 } 1375 Err(e) => panic!("Failed to add nop: {}", e), 1376 } 1377 1378 // We don't need to wait for the completion queue if the submit fails with 1379 // EBUSY because we already added the operation to the submit queue. It will 1380 // get added eventually. 1381 match uring.submit() { 1382 Ok(()) => break, 1383 Err(Error::RingEnter(libc::EBUSY)) => break, 1384 Err(e) => panic!("Failed to submit ops: {}", e), 1385 } 1386 } 1387 } 1388 })); 1389 } 1390 1391 let mut completed = 0; 1392 while completed < NUM_THREADS * ITERATIONS { 1393 for (_, res) in uring.wait().unwrap() { 1394 assert_eq!(res.unwrap(), 0); 1395 completed += 1; 1396 1397 let mut in_flight = in_flight.lock(); 1398 *in_flight -= 1; 1399 let notify_submitters = *in_flight <= NUM_ENTRIES as isize; 1400 mem::drop(in_flight); 1401 1402 if notify_submitters { 1403 cv.notify_all(); 1404 } 1405 1406 if completed >= NUM_THREADS * ITERATIONS { 1407 break; 1408 } 1409 } 1410 } 1411 1412 for t in threads { 1413 t.join().unwrap(); 1414 } 1415 1416 // Make sure we didn't submit more entries than expected. 1417 assert_eq!(*in_flight.lock(), 0); 1418 assert_eq!(uring.submit_ring.lock().added, 0); 1419 assert_eq!(uring.complete_ring.num_ready(), 0); 1420 assert_eq!( 1421 uring.stats.total_ops.load(Ordering::Relaxed), 1422 (NUM_THREADS * ITERATIONS) as u64 1423 ); 1424 } 1425 1426 // TODO(b/183722981): Fix and re-enable test 1427 #[test] 1428 #[ignore] multi_thread_submit_and_complete()1429 fn multi_thread_submit_and_complete() { 1430 const NUM_SUBMITTERS: usize = 7; 1431 const NUM_COMPLETERS: usize = 3; 1432 const ITERATIONS: usize = 113; 1433 const NUM_ENTRIES: usize = 16; 1434 1435 fn wait_for_completion_thread(in_flight: &Mutex<isize>, cv: &Condvar) { 1436 let mut in_flight = in_flight.lock(); 1437 while *in_flight > NUM_ENTRIES as isize { 1438 in_flight = cv.wait(in_flight); 1439 } 1440 } 1441 1442 let uring = URingContext::new(NUM_ENTRIES).map(Arc::new).unwrap(); 1443 let in_flight = Arc::new(Mutex::new(0)); 1444 let cv = Arc::new(Condvar::new()); 1445 1446 let mut threads = Vec::with_capacity(NUM_SUBMITTERS + NUM_COMPLETERS); 1447 for idx in 0..NUM_SUBMITTERS { 1448 let uring = uring.clone(); 1449 let in_flight = in_flight.clone(); 1450 let cv = cv.clone(); 1451 threads.push(thread::spawn(move || { 1452 for iter in 0..ITERATIONS { 1453 loop { 1454 match uring.add_nop(((idx * NUM_SUBMITTERS) + iter) as UserData) { 1455 Ok(()) => *in_flight.lock() += 1, 1456 Err(Error::NoSpace) => { 1457 wait_for_completion_thread(&in_flight, &cv); 1458 continue; 1459 } 1460 Err(e) => panic!("Failed to add nop: {}", e), 1461 } 1462 1463 // We don't need to wait for the completion queue if the submit fails with 1464 // EBUSY because we already added the operation to the submit queue. It will 1465 // get added eventually. 1466 match uring.submit() { 1467 Ok(()) => break, 1468 Err(Error::RingEnter(libc::EBUSY)) => break, 1469 Err(e) => panic!("Failed to submit ops: {}", e), 1470 } 1471 } 1472 } 1473 })); 1474 } 1475 1476 let completed = Arc::new(AtomicUsize::new(0)); 1477 for _ in 0..NUM_COMPLETERS { 1478 let uring = uring.clone(); 1479 let in_flight = in_flight.clone(); 1480 let cv = cv.clone(); 1481 let completed = completed.clone(); 1482 threads.push(thread::spawn(move || { 1483 while completed.load(Ordering::Relaxed) < NUM_SUBMITTERS * ITERATIONS { 1484 for (_, res) in uring.wait().unwrap() { 1485 assert_eq!(res.unwrap(), 0); 1486 completed.fetch_add(1, Ordering::Relaxed); 1487 1488 let mut in_flight = in_flight.lock(); 1489 *in_flight -= 1; 1490 let notify_submitters = *in_flight <= NUM_ENTRIES as isize; 1491 mem::drop(in_flight); 1492 1493 if notify_submitters { 1494 cv.notify_all(); 1495 } 1496 1497 if completed.load(Ordering::Relaxed) >= NUM_SUBMITTERS * ITERATIONS { 1498 break; 1499 } 1500 } 1501 } 1502 })); 1503 } 1504 1505 for t in threads.drain(..NUM_SUBMITTERS) { 1506 t.join().unwrap(); 1507 } 1508 1509 // Now that all submitters are finished, add NOPs to wake up any completers blocked on the 1510 // syscall. 1511 for i in 0..NUM_COMPLETERS { 1512 uring 1513 .add_nop((NUM_SUBMITTERS * ITERATIONS + i) as UserData) 1514 .unwrap(); 1515 } 1516 uring.submit().unwrap(); 1517 1518 for t in threads { 1519 t.join().unwrap(); 1520 } 1521 1522 // Make sure we didn't submit more entries than expected. Only the last few NOPs added to 1523 // wake up the completer threads may still be in the completion ring. 1524 assert!(uring.complete_ring.num_ready() <= NUM_COMPLETERS as u32); 1525 assert_eq!( 1526 in_flight.lock().abs() as u32 + uring.complete_ring.num_ready(), 1527 NUM_COMPLETERS as u32 1528 ); 1529 assert_eq!(uring.submit_ring.lock().added, 0); 1530 assert_eq!( 1531 uring.stats.total_ops.load(Ordering::Relaxed), 1532 (NUM_SUBMITTERS * ITERATIONS + NUM_COMPLETERS) as u64 1533 ); 1534 } 1535 } 1536