1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! `URingExecutor`
6 //!
7 //! The executor runs all given futures to completion. Futures register wakers associated with
8 //! io_uring operations. A waker is called when the set of uring ops the waker is waiting on
9 //! completes.
10 //!
11 //! `URingExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
12 //! utility functions to combine futures. In general, the interface provided by `URingExecutor`
13 //! shouldn't be used directly. Instead, use them by interacting with implementors of `IoSource`,
14 //! and the high-level future functions.
15 //!
16 //!
17 //! ## Read/Write buffer management.
18 //!
19 //! There are two key issues managing asynchronous IO buffers in rust.
20 //! 1) The kernel has a mutable reference to the memory until the completion is returned. Rust must
21 //! not have any references to it during that time.
22 //! 2) The memory must remain valid as long as the kernel has a reference to it.
23 //!
24 //! ### The kernel's mutable borrow of the buffer
25 //!
26 //! Because the buffers used for read and write must be passed to the kernel for an unknown
27 //! duration, the functions must maintain ownership of the memory. The core of this problem is that
28 //! the lifetime of the future isn't tied to the scope in which the kernel can modify the buffer the
29 //! future has a reference to. The buffer can be modified at any point from submission until the
30 //! operation completes. The operation can't be synchronously canceled when the future is dropped,
31 //! and Drop can't be used for safety guarantees. To ensure this never happens, only memory that
32 //! implements `BackingMemory` is accepted. For implementors of `BackingMemory` the mut borrow
33 //! isn't an issue because those are already Ok with external modifications to the memory (Like a
34 //! `VolatileSlice`).
35 //!
36 //! ### Buffer lifetime
37 //!
38 //! What if the kernel's reference to the buffer outlives the buffer itself? This could happen if a
39 //! read operation was submitted, then the memory is dropped. To solve this, the executor takes an
40 //! Arc to the backing memory. Vecs being read to are also wrapped in an Arc before being passed to
41 //! the executor. The executor holds the Arc and ensures all operations are complete before dropping
42 //! it, that guarantees the memory is valid for the duration.
43 //!
44 //! The buffers _have_ to be on the heap. Because we don't have a way to cancel a future if it is
45 //! dropped(can't rely on drop running), there is no way to ensure the kernel's buffer remains valid
46 //! until the operation completes unless the executor holds an Arc to the memory on the heap.
47 //!
48 //! ## Using `Vec` for reads/writes.
49 //!
50 //! There is a convenience wrapper `VecIoWrapper` provided for fully owned vectors. This type
51 //! ensures that only the kernel is allowed to access the `Vec` and wraps the the `Vec` in an Arc to
52 //! ensure it lives long enough.
53
54 use std::convert::TryInto;
55 use std::fs::File;
56 use std::future::Future;
57 use std::io;
58 use std::mem;
59 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
60 use std::pin::Pin;
61 use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};
62 use std::sync::{Arc, Weak};
63 use std::task::Waker;
64 use std::task::{Context, Poll};
65 use std::thread::{self, ThreadId};
66
67 use async_task::Task;
68 use futures::task::noop_waker;
69 use io_uring::URingContext;
70 use pin_utils::pin_mut;
71 use slab::Slab;
72 use sync::Mutex;
73 use sys_util::{warn, WatchingEvents};
74 use thiserror::Error as ThisError;
75
76 use crate::mem::{BackingMemory, MemRegion};
77 use crate::queue::RunnableQueue;
78 use crate::waker::{new_waker, WakerToken, WeakWake};
79
80 #[derive(Debug, ThisError)]
81 pub enum Error {
82 /// Failed to copy the FD for the polling context.
83 #[error("Failed to copy the FD for the polling context: {0}")]
84 DuplicatingFd(sys_util::Error),
85 /// The Executor is gone.
86 #[error("The URingExecutor is gone")]
87 ExecutorGone,
88 /// Invalid offset or length given for an iovec in backing memory.
89 #[error("Invalid offset/len for getting an iovec")]
90 InvalidOffset,
91 /// Invalid FD source specified.
92 #[error("Invalid source, FD not registered for use")]
93 InvalidSource,
94 /// Error doing the IO.
95 #[error("Error during IO: {0}")]
96 Io(io::Error),
97 /// Creating a context to wait on FDs failed.
98 #[error("Error creating the fd waiting context: {0}")]
99 CreatingContext(io_uring::Error),
100 /// Failed to remove the waker remove the polling context.
101 #[error("Error removing from the URing context: {0}")]
102 RemovingWaker(io_uring::Error),
103 /// Failed to submit the operation to the polling context.
104 #[error("Error adding to the URing context: {0}")]
105 SubmittingOp(io_uring::Error),
106 /// URingContext failure.
107 #[error("URingContext failure: {0}")]
108 URingContextError(io_uring::Error),
109 /// Failed to submit or wait for io_uring events.
110 #[error("URing::enter: {0}")]
111 URingEnter(io_uring::Error),
112 }
113 pub type Result<T> = std::result::Result<T, Error>;
114
115 // Checks if the uring executor is available.
116 // Caches the result so that the check is only run once.
117 // Useful for falling back to the FD executor on pre-uring kernels.
use_uring() -> bool118 pub(crate) fn use_uring() -> bool {
119 const UNKNOWN: u32 = 0;
120 const URING: u32 = 1;
121 const FD: u32 = 2;
122 static USE_URING: AtomicU32 = AtomicU32::new(UNKNOWN);
123 match USE_URING.load(Ordering::Relaxed) {
124 UNKNOWN => {
125 // Create a dummy uring context to check that the kernel understands the syscalls.
126 if URingContext::new(8).is_ok() {
127 USE_URING.store(URING, Ordering::Relaxed);
128 true
129 } else {
130 USE_URING.store(FD, Ordering::Relaxed);
131 false
132 }
133 }
134 URING => true,
135 FD => false,
136 _ => unreachable!("invalid use uring state"),
137 }
138 }
139
140 pub struct RegisteredSource {
141 tag: usize,
142 ex: Weak<RawExecutor>,
143 }
144
145 impl RegisteredSource {
start_read_to_mem( &self, file_offset: u64, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: &[MemRegion], ) -> Result<PendingOperation>146 pub fn start_read_to_mem(
147 &self,
148 file_offset: u64,
149 mem: Arc<dyn BackingMemory + Send + Sync>,
150 addrs: &[MemRegion],
151 ) -> Result<PendingOperation> {
152 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
153 let token = ex.submit_read_to_vectored(self, mem, file_offset, addrs)?;
154
155 Ok(PendingOperation {
156 waker_token: Some(token),
157 ex: self.ex.clone(),
158 submitted: false,
159 })
160 }
161
start_write_from_mem( &self, file_offset: u64, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: &[MemRegion], ) -> Result<PendingOperation>162 pub fn start_write_from_mem(
163 &self,
164 file_offset: u64,
165 mem: Arc<dyn BackingMemory + Send + Sync>,
166 addrs: &[MemRegion],
167 ) -> Result<PendingOperation> {
168 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
169 let token = ex.submit_write_from_vectored(self, mem, file_offset, addrs)?;
170
171 Ok(PendingOperation {
172 waker_token: Some(token),
173 ex: self.ex.clone(),
174 submitted: false,
175 })
176 }
177
start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation>178 pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {
179 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
180 let token = ex.submit_fallocate(self, offset, len, mode)?;
181
182 Ok(PendingOperation {
183 waker_token: Some(token),
184 ex: self.ex.clone(),
185 submitted: false,
186 })
187 }
188
start_fsync(&self) -> Result<PendingOperation>189 pub fn start_fsync(&self) -> Result<PendingOperation> {
190 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
191 let token = ex.submit_fsync(self)?;
192
193 Ok(PendingOperation {
194 waker_token: Some(token),
195 ex: self.ex.clone(),
196 submitted: false,
197 })
198 }
199
poll_fd_readable(&self) -> Result<PendingOperation>200 pub fn poll_fd_readable(&self) -> Result<PendingOperation> {
201 let events = WatchingEvents::empty().set_read();
202
203 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
204 let token = ex.submit_poll(self, &events)?;
205
206 Ok(PendingOperation {
207 waker_token: Some(token),
208 ex: self.ex.clone(),
209 submitted: false,
210 })
211 }
212 }
213
214 impl Drop for RegisteredSource {
drop(&mut self)215 fn drop(&mut self) {
216 if let Some(ex) = self.ex.upgrade() {
217 let _ = ex.deregister_source(self);
218 }
219 }
220 }
221
222 // Indicates that the executor is either within or about to make an io_uring_enter syscall. When a
223 // waker sees this value, it will add and submit a NOP to the uring, which will wake up the thread
224 // blocked on the io_uring_enter syscall.
225 const WAITING: i32 = 0xb80d_21b5u32 as i32;
226
227 // Indicates that the executor is processing any futures that are ready to run.
228 const PROCESSING: i32 = 0xdb31_83a3u32 as i32;
229
230 // Indicates that one or more futures may be ready to make progress.
231 const WOKEN: i32 = 0x0fc7_8f7eu32 as i32;
232
233 // Number of entries in the ring.
234 const NUM_ENTRIES: usize = 256;
235
236 // An operation that has been submitted to the uring and is potentially being waited on.
237 struct OpData {
238 _file: Arc<File>,
239 _mem: Option<Arc<dyn BackingMemory + Send + Sync>>,
240 waker: Option<Waker>,
241 canceled: bool,
242 }
243
244 // The current status of an operation that's been submitted to the uring.
245 enum OpStatus {
246 Nop,
247 Pending(OpData),
248 Completed(Option<::std::io::Result<u32>>),
249 }
250
251 struct Ring {
252 ops: Slab<OpStatus>,
253 registered_sources: Slab<Arc<File>>,
254 }
255
256 struct RawExecutor {
257 // The URingContext needs to be first so that it is dropped first, closing the uring fd, and
258 // releasing the resources borrowed by the kernel before we free them.
259 ctx: URingContext,
260 queue: RunnableQueue,
261 ring: Mutex<Ring>,
262 thread_id: Mutex<Option<ThreadId>>,
263 state: AtomicI32,
264 }
265
266 impl RawExecutor {
new() -> Result<RawExecutor>267 fn new() -> Result<RawExecutor> {
268 Ok(RawExecutor {
269 ctx: URingContext::new(NUM_ENTRIES).map_err(Error::CreatingContext)?,
270 queue: RunnableQueue::new(),
271 ring: Mutex::new(Ring {
272 ops: Slab::with_capacity(NUM_ENTRIES),
273 registered_sources: Slab::with_capacity(NUM_ENTRIES),
274 }),
275 thread_id: Mutex::new(None),
276 state: AtomicI32::new(PROCESSING),
277 })
278 }
279
wake(&self)280 fn wake(&self) {
281 let oldstate = self.state.swap(WOKEN, Ordering::Release);
282 if oldstate == WAITING {
283 let mut ring = self.ring.lock();
284 let entry = ring.ops.vacant_entry();
285 let next_op_token = entry.key();
286 if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {
287 warn!("Failed to add NOP for waking up executor: {}", e);
288 }
289 entry.insert(OpStatus::Nop);
290 mem::drop(ring);
291
292 match self.ctx.submit() {
293 Ok(()) => {}
294 // If the kernel's submit ring is full then we know we won't block when calling
295 // io_uring_enter, which is all we really care about.
296 Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
297 Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),
298 }
299 }
300 }
301
spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,302 fn spawn<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
303 where
304 F: Future + Send + 'static,
305 F::Output: Send + 'static,
306 {
307 let raw = Arc::downgrade(self);
308 let schedule = move |runnable| {
309 if let Some(r) = raw.upgrade() {
310 r.queue.push_back(runnable);
311 r.wake();
312 }
313 };
314 let (runnable, task) = async_task::spawn(f, schedule);
315 runnable.schedule();
316 task
317 }
318
spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,319 fn spawn_local<F>(self: &Arc<Self>, f: F) -> Task<F::Output>
320 where
321 F: Future + 'static,
322 F::Output: 'static,
323 {
324 let raw = Arc::downgrade(self);
325 let schedule = move |runnable| {
326 if let Some(r) = raw.upgrade() {
327 r.queue.push_back(runnable);
328 r.wake();
329 }
330 };
331 let (runnable, task) = async_task::spawn_local(f, schedule);
332 runnable.schedule();
333 task
334 }
335
runs_tasks_on_current_thread(&self) -> bool336 fn runs_tasks_on_current_thread(&self) -> bool {
337 let executor_thread = self.thread_id.lock();
338 executor_thread
339 .map(|id| id == thread::current().id())
340 .unwrap_or(false)
341 }
342
run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output>343 fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
344 let current_thread = thread::current().id();
345 let mut thread_id = self.thread_id.lock();
346 assert_eq!(
347 *thread_id.get_or_insert(current_thread),
348 current_thread,
349 "`URingExecutor::run` cannot be called from more than one thread"
350 );
351 mem::drop(thread_id);
352
353 pin_mut!(done);
354 loop {
355 self.state.store(PROCESSING, Ordering::Release);
356 for runnable in self.queue.iter() {
357 runnable.run();
358 }
359
360 if let Poll::Ready(val) = done.as_mut().poll(cx) {
361 return Ok(val);
362 }
363
364 let oldstate = self.state.compare_exchange(
365 PROCESSING,
366 WAITING,
367 Ordering::Acquire,
368 Ordering::Acquire,
369 );
370 if let Err(oldstate) = oldstate {
371 debug_assert_eq!(oldstate, WOKEN);
372 // One or more futures have become runnable.
373 continue;
374 }
375
376 let events = self.ctx.wait().map_err(Error::URingEnter)?;
377
378 // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
379 // writing to the eventfd.
380 self.state.store(PROCESSING, Ordering::Release);
381
382 let mut ring = self.ring.lock();
383 for (raw_token, result) in events {
384 // While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
385 // something that we originally gave to the kernel and that was created from a
386 // `usize` so we should always be able to convert it back into a `usize`.
387 let token = raw_token
388 .try_into()
389 .expect("`u64` doesn't fit inside a `usize`");
390
391 let op = ring
392 .ops
393 .get_mut(token)
394 .expect("Received completion token for unexpected operation");
395 match mem::replace(op, OpStatus::Completed(Some(result))) {
396 // No one is waiting on a Nop.
397 OpStatus::Nop => mem::drop(ring.ops.remove(token)),
398 OpStatus::Pending(data) => {
399 if data.canceled {
400 // No one is waiting for this operation and the uring is done with
401 // it so it's safe to remove.
402 ring.ops.remove(token);
403 }
404 if let Some(waker) = data.waker {
405 waker.wake();
406 }
407 }
408 OpStatus::Completed(_) => panic!("uring operation completed more than once"),
409 }
410 }
411 }
412 }
413
get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>>414 fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {
415 let mut ring = self.ring.lock();
416
417 let op = ring
418 .ops
419 .get_mut(token.0)
420 .expect("`get_result` called on unknown operation");
421 match op {
422 OpStatus::Nop => panic!("`get_result` called on nop"),
423 OpStatus::Pending(data) => {
424 if data.canceled {
425 panic!("`get_result` called on canceled operation");
426 }
427 data.waker = Some(cx.waker().clone());
428 None
429 }
430 OpStatus::Completed(res) => {
431 let out = res.take();
432 ring.ops.remove(token.0);
433 Some(out.expect("Missing result in completed operation"))
434 }
435 }
436 }
437
438 // Remove the waker for the given token if it hasn't fired yet.
cancel_operation(&self, token: WakerToken)439 fn cancel_operation(&self, token: WakerToken) {
440 let mut ring = self.ring.lock();
441 if let Some(op) = ring.ops.get_mut(token.0) {
442 match op {
443 OpStatus::Nop => panic!("`cancel_operation` called on nop"),
444 OpStatus::Pending(data) => {
445 if data.canceled {
446 panic!("uring operation canceled more than once");
447 }
448
449 // Clear the waker as it is no longer needed.
450 data.waker = None;
451 data.canceled = true;
452
453 // Keep the rest of the op data as the uring might still be accessing either
454 // the source of the backing memory so it needs to live until the kernel
455 // completes the operation. TODO: cancel the operation in the uring.
456 }
457 OpStatus::Completed(_) => {
458 ring.ops.remove(token.0);
459 }
460 }
461 }
462 }
463
register_source(&self, f: Arc<File>) -> usize464 fn register_source(&self, f: Arc<File>) -> usize {
465 self.ring.lock().registered_sources.insert(f)
466 }
467
deregister_source(&self, source: &RegisteredSource)468 fn deregister_source(&self, source: &RegisteredSource) {
469 // There isn't any need to pull pending ops out, the all have Arc's to the file and mem they
470 // need.let them complete. deregister with pending ops is not a common path no need to
471 // optimize that case yet.
472 self.ring.lock().registered_sources.remove(source.tag);
473 }
474
submit_poll( &self, source: &RegisteredSource, events: &sys_util::WatchingEvents, ) -> Result<WakerToken>475 fn submit_poll(
476 &self,
477 source: &RegisteredSource,
478 events: &sys_util::WatchingEvents,
479 ) -> Result<WakerToken> {
480 let mut ring = self.ring.lock();
481 let src = ring
482 .registered_sources
483 .get(source.tag)
484 .map(Arc::clone)
485 .ok_or(Error::InvalidSource)?;
486 let entry = ring.ops.vacant_entry();
487 let next_op_token = entry.key();
488 self.ctx
489 .add_poll_fd(src.as_raw_fd(), events, usize_to_u64(next_op_token))
490 .map_err(Error::SubmittingOp)?;
491
492 entry.insert(OpStatus::Pending(OpData {
493 _file: src,
494 _mem: None,
495 waker: None,
496 canceled: false,
497 }));
498
499 Ok(WakerToken(next_op_token))
500 }
501
submit_fallocate( &self, source: &RegisteredSource, offset: u64, len: u64, mode: u32, ) -> Result<WakerToken>502 fn submit_fallocate(
503 &self,
504 source: &RegisteredSource,
505 offset: u64,
506 len: u64,
507 mode: u32,
508 ) -> Result<WakerToken> {
509 let mut ring = self.ring.lock();
510 let src = ring
511 .registered_sources
512 .get(source.tag)
513 .map(Arc::clone)
514 .ok_or(Error::InvalidSource)?;
515 let entry = ring.ops.vacant_entry();
516 let next_op_token = entry.key();
517 self.ctx
518 .add_fallocate(
519 src.as_raw_fd(),
520 offset,
521 len,
522 mode,
523 usize_to_u64(next_op_token),
524 )
525 .map_err(Error::SubmittingOp)?;
526
527 entry.insert(OpStatus::Pending(OpData {
528 _file: src,
529 _mem: None,
530 waker: None,
531 canceled: false,
532 }));
533
534 Ok(WakerToken(next_op_token))
535 }
536
submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken>537 fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
538 let mut ring = self.ring.lock();
539 let src = ring
540 .registered_sources
541 .get(source.tag)
542 .map(Arc::clone)
543 .ok_or(Error::InvalidSource)?;
544 let entry = ring.ops.vacant_entry();
545 let next_op_token = entry.key();
546 self.ctx
547 .add_fsync(src.as_raw_fd(), usize_to_u64(next_op_token))
548 .map_err(Error::SubmittingOp)?;
549
550 entry.insert(OpStatus::Pending(OpData {
551 _file: src,
552 _mem: None,
553 waker: None,
554 canceled: false,
555 }));
556
557 Ok(WakerToken(next_op_token))
558 }
559
submit_read_to_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: u64, addrs: &[MemRegion], ) -> Result<WakerToken>560 fn submit_read_to_vectored(
561 &self,
562 source: &RegisteredSource,
563 mem: Arc<dyn BackingMemory + Send + Sync>,
564 offset: u64,
565 addrs: &[MemRegion],
566 ) -> Result<WakerToken> {
567 if addrs
568 .iter()
569 .any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
570 {
571 return Err(Error::InvalidOffset);
572 }
573
574 let mut ring = self.ring.lock();
575 let src = ring
576 .registered_sources
577 .get(source.tag)
578 .map(Arc::clone)
579 .ok_or(Error::InvalidSource)?;
580
581 // We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
582 let entry = ring.ops.vacant_entry();
583 let next_op_token = entry.key();
584
585 // The addresses have already been validated, so unwrapping them will succeed.
586 // validate their addresses before submitting.
587 let iovecs = addrs
588 .iter()
589 .map(|&mem_range| *mem.get_volatile_slice(mem_range).unwrap().as_iobuf());
590
591 unsafe {
592 // Safe because all the addresses are within the Memory that an Arc is kept for the
593 // duration to ensure the memory is valid while the kernel accesses it.
594 // Tested by `dont_drop_backing_mem_read` unit test.
595 self.ctx
596 .add_readv_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
597 .map_err(Error::SubmittingOp)?;
598 }
599
600 entry.insert(OpStatus::Pending(OpData {
601 _file: src,
602 _mem: Some(mem),
603 waker: None,
604 canceled: false,
605 }));
606
607 Ok(WakerToken(next_op_token))
608 }
609
submit_write_from_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: u64, addrs: &[MemRegion], ) -> Result<WakerToken>610 fn submit_write_from_vectored(
611 &self,
612 source: &RegisteredSource,
613 mem: Arc<dyn BackingMemory + Send + Sync>,
614 offset: u64,
615 addrs: &[MemRegion],
616 ) -> Result<WakerToken> {
617 if addrs
618 .iter()
619 .any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
620 {
621 return Err(Error::InvalidOffset);
622 }
623
624 let mut ring = self.ring.lock();
625 let src = ring
626 .registered_sources
627 .get(source.tag)
628 .map(Arc::clone)
629 .ok_or(Error::InvalidSource)?;
630
631 // We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
632 let entry = ring.ops.vacant_entry();
633 let next_op_token = entry.key();
634
635 // The addresses have already been validated, so unwrapping them will succeed.
636 // validate their addresses before submitting.
637 let iovecs = addrs
638 .iter()
639 .map(|&mem_range| *mem.get_volatile_slice(mem_range).unwrap().as_iobuf());
640
641 unsafe {
642 // Safe because all the addresses are within the Memory that an Arc is kept for the
643 // duration to ensure the memory is valid while the kernel accesses it.
644 // Tested by `dont_drop_backing_mem_write` unit test.
645 self.ctx
646 .add_writev_iter(iovecs, src.as_raw_fd(), offset, usize_to_u64(next_op_token))
647 .map_err(Error::SubmittingOp)?;
648 }
649
650 entry.insert(OpStatus::Pending(OpData {
651 _file: src,
652 _mem: Some(mem),
653 waker: None,
654 canceled: false,
655 }));
656
657 Ok(WakerToken(next_op_token))
658 }
659 }
660
661 impl WeakWake for RawExecutor {
wake_by_ref(weak_self: &Weak<Self>)662 fn wake_by_ref(weak_self: &Weak<Self>) {
663 if let Some(arc_self) = weak_self.upgrade() {
664 RawExecutor::wake(&arc_self);
665 }
666 }
667 }
668
669 impl Drop for RawExecutor {
drop(&mut self)670 fn drop(&mut self) {
671 // Wake up any futures still waiting on uring operations.
672 let ring = self.ring.get_mut();
673 for (_, op) in ring.ops.iter_mut() {
674 match op {
675 OpStatus::Nop => {}
676 OpStatus::Pending(data) => {
677 // If the operation wasn't already canceled then wake up the future waiting on
678 // it. When polled that future will get an ExecutorGone error anyway so there's
679 // no point in waiting until the operation completes to wake it up.
680 if !data.canceled {
681 if let Some(waker) = data.waker.take() {
682 waker.wake();
683 }
684 }
685
686 data.canceled = true;
687 }
688 OpStatus::Completed(_) => {}
689 }
690 }
691
692 // Since the RawExecutor is wrapped in an Arc it may end up being dropped from a different
693 // thread than the one that called `run` or `run_until`. Since we know there are no other
694 // references, just clear the thread id so that we don't panic.
695 *self.thread_id.lock() = None;
696
697 // Now run the executor loop once more to poll any futures we just woke up.
698 let waker = noop_waker();
699 let mut cx = Context::from_waker(&waker);
700 let res = self.run(&mut cx, async {});
701
702 if let Err(e) = res {
703 warn!("Failed to drive uring to completion: {}", e);
704 }
705 }
706 }
707
708 /// An executor that uses io_uring for its asynchronous I/O operations. See the documentation of
709 /// `Executor` for more details about the methods.
710 #[derive(Clone)]
711 pub struct URingExecutor {
712 raw: Arc<RawExecutor>,
713 }
714
715 impl URingExecutor {
new() -> Result<URingExecutor>716 pub fn new() -> Result<URingExecutor> {
717 let raw = RawExecutor::new().map(Arc::new)?;
718
719 Ok(URingExecutor { raw })
720 }
721
spawn<F>(&self, f: F) -> Task<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,722 pub fn spawn<F>(&self, f: F) -> Task<F::Output>
723 where
724 F: Future + Send + 'static,
725 F::Output: Send + 'static,
726 {
727 self.raw.spawn(f)
728 }
729
spawn_local<F>(&self, f: F) -> Task<F::Output> where F: Future + 'static, F::Output: 'static,730 pub fn spawn_local<F>(&self, f: F) -> Task<F::Output>
731 where
732 F: Future + 'static,
733 F::Output: 'static,
734 {
735 self.raw.spawn_local(f)
736 }
737
run(&self) -> Result<()>738 pub fn run(&self) -> Result<()> {
739 let waker = new_waker(Arc::downgrade(&self.raw));
740 let mut cx = Context::from_waker(&waker);
741
742 self.raw.run(&mut cx, crate::empty::<()>())
743 }
744
run_until<F: Future>(&self, f: F) -> Result<F::Output>745 pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
746 let waker = new_waker(Arc::downgrade(&self.raw));
747 let mut ctx = Context::from_waker(&waker);
748 self.raw.run(&mut ctx, f)
749 }
750
751 /// Register a file and memory pair for buffered asynchronous operation.
register_source<F: AsRawFd>(&self, fd: &F) -> Result<RegisteredSource>752 pub(crate) fn register_source<F: AsRawFd>(&self, fd: &F) -> Result<RegisteredSource> {
753 let duped_fd = unsafe {
754 // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
755 // will only be added to the poll loop.
756 File::from_raw_fd(dup_fd(fd.as_raw_fd())?)
757 };
758
759 Ok(RegisteredSource {
760 tag: self.raw.register_source(Arc::new(duped_fd)),
761 ex: Arc::downgrade(&self.raw),
762 })
763 }
764 }
765
766 // Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while
767 // waiting in TLS to be added to the main polling context.
dup_fd(fd: RawFd) -> Result<RawFd>768 unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
769 let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
770 if ret < 0 {
771 Err(Error::DuplicatingFd(sys_util::Error::last()))
772 } else {
773 Ok(ret)
774 }
775 }
776
777 // Converts a `usize` into a `u64` and panics if the conversion fails.
778 #[inline]
usize_to_u64(val: usize) -> u64779 fn usize_to_u64(val: usize) -> u64 {
780 val.try_into().expect("`usize` doesn't fit inside a `u64`")
781 }
782
783 pub struct PendingOperation {
784 waker_token: Option<WakerToken>,
785 ex: Weak<RawExecutor>,
786 submitted: bool,
787 }
788
789 impl Future for PendingOperation {
790 type Output = Result<u32>;
791
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>792 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
793 let token = self
794 .waker_token
795 .as_ref()
796 .expect("PendingOperation polled after returning Poll::Ready");
797 if let Some(ex) = self.ex.upgrade() {
798 if let Some(result) = ex.get_result(token, cx) {
799 self.waker_token = None;
800 Poll::Ready(result.map_err(Error::Io))
801 } else {
802 // If we haven't submitted the operation yet, and the executor runs on a different
803 // thread then submit it now. Otherwise the executor will submit it automatically
804 // the next time it calls UringContext::wait.
805 if !self.submitted && !ex.runs_tasks_on_current_thread() {
806 match ex.ctx.submit() {
807 Ok(()) => self.submitted = true,
808 // If the kernel ring is full then wait until some ops are removed from the
809 // completion queue. This op should get submitted the next time the executor
810 // calls UringContext::wait.
811 Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
812 Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
813 }
814 }
815 Poll::Pending
816 }
817 } else {
818 Poll::Ready(Err(Error::ExecutorGone))
819 }
820 }
821 }
822
823 impl Drop for PendingOperation {
drop(&mut self)824 fn drop(&mut self) {
825 if let Some(waker_token) = self.waker_token.take() {
826 if let Some(ex) = self.ex.upgrade() {
827 ex.cancel_operation(waker_token);
828 }
829 }
830 }
831 }
832
833 #[cfg(test)]
834 mod tests {
835 use std::future::Future;
836 use std::io::{Read, Write};
837 use std::mem;
838 use std::pin::Pin;
839 use std::task::{Context, Poll};
840
841 use futures::executor::block_on;
842
843 use super::*;
844 use crate::mem::{BackingMemory, MemRegion, VecIoWrapper};
845
846 // A future that returns ready when the uring queue is empty.
847 struct UringQueueEmpty<'a> {
848 ex: &'a URingExecutor,
849 }
850
851 impl<'a> Future for UringQueueEmpty<'a> {
852 type Output = ();
853
poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output>854 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
855 if self.ex.raw.ring.lock().ops.is_empty() {
856 Poll::Ready(())
857 } else {
858 Poll::Pending
859 }
860 }
861 }
862
863 #[test]
dont_drop_backing_mem_read()864 fn dont_drop_backing_mem_read() {
865 // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
866 // op is pending.
867 let bm =
868 Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
869
870 // Use pipes to create a future that will block forever.
871 let (rx, mut tx) = sys_util::pipe(true).unwrap();
872
873 // Set up the TLS for the uring_executor by creating one.
874 let ex = URingExecutor::new().unwrap();
875
876 // Register the receive side of the pipe with the executor.
877 let registered_source = ex.register_source(&rx).expect("register source failed");
878
879 // Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
880 // of the op.
881 let pending_op = registered_source
882 .start_read_to_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
883 .expect("failed to start read to mem");
884
885 // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
886 // reference while the op is active.
887 assert_eq!(Arc::strong_count(&bm), 2);
888
889 // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
890 // it.
891 drop(pending_op);
892 assert_eq!(Arc::strong_count(&bm), 2);
893
894 // Finishing the operation should put the Arc count back to 1.
895 // write to the pipe to wake the read pipe and then wait for the uring result in the
896 // executor.
897 tx.write_all(&[0u8; 8]).expect("write failed");
898 ex.run_until(UringQueueEmpty { ex: &ex })
899 .expect("Failed to wait for read pipe ready");
900 assert_eq!(Arc::strong_count(&bm), 1);
901 }
902
903 #[test]
dont_drop_backing_mem_write()904 fn dont_drop_backing_mem_write() {
905 // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
906 // op is pending.
907 let bm =
908 Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
909
910 // Use pipes to create a future that will block forever.
911 let (mut rx, tx) = sys_util::new_pipe_full().expect("Pipe failed");
912
913 // Set up the TLS for the uring_executor by creating one.
914 let ex = URingExecutor::new().unwrap();
915
916 // Register the receive side of the pipe with the executor.
917 let registered_source = ex.register_source(&tx).expect("register source failed");
918
919 // Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
920 // of the op.
921 let pending_op = registered_source
922 .start_write_from_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
923 .expect("failed to start write to mem");
924
925 // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
926 // reference while the op is active.
927 assert_eq!(Arc::strong_count(&bm), 2);
928
929 // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
930 // it.
931 drop(pending_op);
932 assert_eq!(Arc::strong_count(&bm), 2);
933
934 // Finishing the operation should put the Arc count back to 1.
935 // write to the pipe to wake the read pipe and then wait for the uring result in the
936 // executor.
937 let mut buf = vec![0u8; sys_util::round_up_to_page_size(1)];
938 rx.read_exact(&mut buf).expect("read to empty failed");
939 ex.run_until(UringQueueEmpty { ex: &ex })
940 .expect("Failed to wait for write pipe ready");
941 assert_eq!(Arc::strong_count(&bm), 1);
942 }
943
944 #[test]
canceled_before_completion()945 fn canceled_before_completion() {
946 async fn cancel_io(op: PendingOperation) {
947 mem::drop(op);
948 }
949
950 async fn check_result(op: PendingOperation, expected: u32) {
951 let actual = op.await.expect("operation failed to complete");
952 assert_eq!(expected, actual);
953 }
954
955 let bm =
956 Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
957
958 let (rx, tx) = sys_util::pipe(true).expect("Pipe failed");
959
960 let ex = URingExecutor::new().unwrap();
961
962 let rx_source = ex.register_source(&rx).expect("register source failed");
963 let tx_source = ex.register_source(&tx).expect("register source failed");
964
965 let read_task = rx_source
966 .start_read_to_mem(0, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
967 .expect("failed to start read to mem");
968
969 ex.spawn_local(cancel_io(read_task)).detach();
970
971 // Write to the pipe so that the kernel operation will complete.
972 let buf =
973 Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
974 let write_task = tx_source
975 .start_write_from_mem(0, Arc::clone(&buf), &[MemRegion { offset: 0, len: 8 }])
976 .expect("failed to start write from mem");
977
978 ex.run_until(check_result(write_task, 8))
979 .expect("Failed to run executor");
980 }
981
982 #[test]
drop_before_completion()983 fn drop_before_completion() {
984 const VALUE: u64 = 0xef6c_a8df_b842_eb9c;
985
986 async fn check_op(op: PendingOperation) {
987 let err = op.await.expect_err("Op completed successfully");
988 match err {
989 Error::ExecutorGone => {}
990 e => panic!("Unexpected error from op: {}", e),
991 }
992 }
993
994 let (mut rx, mut tx) = sys_util::pipe(true).expect("Pipe failed");
995
996 let ex = URingExecutor::new().unwrap();
997
998 let tx_source = ex.register_source(&tx).expect("Failed to register source");
999 let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));
1000 let op = tx_source
1001 .start_write_from_mem(
1002 0,
1003 bm,
1004 &[MemRegion {
1005 offset: 0,
1006 len: mem::size_of::<u64>(),
1007 }],
1008 )
1009 .expect("Failed to start write from mem");
1010
1011 ex.spawn_local(check_op(op)).detach();
1012
1013 // Now drop the executor. It shouldn't run the write operation.
1014 mem::drop(ex);
1015
1016 // Make sure the executor did not complete the uring operation.
1017 let new_val = [0x2e; 8];
1018 tx.write_all(&new_val).unwrap();
1019
1020 let mut buf = 0u64.to_ne_bytes();
1021 rx.read_exact(&mut buf[..])
1022 .expect("Failed to read from pipe");
1023
1024 assert_eq!(buf, new_val);
1025 }
1026
1027 #[test]
drop_on_different_thread()1028 fn drop_on_different_thread() {
1029 let ex = URingExecutor::new().unwrap();
1030
1031 let ex2 = ex.clone();
1032 let t = thread::spawn(move || ex2.run_until(async {}));
1033
1034 t.join().unwrap().unwrap();
1035
1036 // Leave an uncompleted operation in the queue so that the drop impl will try to drive it to
1037 // completion.
1038 let (_rx, tx) = sys_util::pipe(true).expect("Pipe failed");
1039 let tx = ex.register_source(&tx).expect("Failed to register source");
1040 let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
1041 let op = tx
1042 .start_write_from_mem(
1043 0,
1044 bm,
1045 &[MemRegion {
1046 offset: 0,
1047 len: mem::size_of::<u64>(),
1048 }],
1049 )
1050 .expect("Failed to start write from mem");
1051
1052 mem::drop(ex);
1053
1054 match block_on(op).expect_err("Pending operation completed after executor was dropped") {
1055 Error::ExecutorGone => {}
1056 e => panic!("Unexpected error after dropping executor: {}", e),
1057 }
1058 }
1059 }
1060