1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 //! `URingExecutor`
6 //!
7 //! The executor runs all given futures to completion. Futures register wakers associated with
8 //! io_uring operations. A waker is called when the set of uring ops the waker is waiting on
9 //! completes.
10 //!
11 //! `URingExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
12 //! utility functions to combine futures. In general, the interface provided by `URingExecutor`
13 //! shouldn't be used directly. Instead, use them by interacting with implementors of `IoSource`,
14 //! and the high-level future functions.
15 //!
16 //!
17 //! ## Read/Write buffer management.
18 //!
19 //! There are two key issues managing asynchronous IO buffers in rust.
20 //! 1) The kernel has a mutable reference to the memory until the completion is returned. Rust must
21 //! not have any references to it during that time.
22 //! 2) The memory must remain valid as long as the kernel has a reference to it.
23 //!
24 //! ### The kernel's mutable borrow of the buffer
25 //!
26 //! Because the buffers used for read and write must be passed to the kernel for an unknown
27 //! duration, the functions must maintain ownership of the memory. The core of this problem is that
28 //! the lifetime of the future isn't tied to the scope in which the kernel can modify the buffer the
29 //! future has a reference to. The buffer can be modified at any point from submission until the
30 //! operation completes. The operation can't be synchronously canceled when the future is dropped,
31 //! and Drop can't be used for safety guarantees. To ensure this never happens, only memory that
32 //! implements `BackingMemory` is accepted. For implementors of `BackingMemory` the mut borrow
33 //! isn't an issue because those are already Ok with external modifications to the memory (Like a
34 //! `VolatileSlice`).
35 //!
36 //! ### Buffer lifetime
37 //!
38 //! What if the kernel's reference to the buffer outlives the buffer itself? This could happen if a
39 //! read operation was submitted, then the memory is dropped. To solve this, the executor takes an
40 //! Arc to the backing memory. Vecs being read to are also wrapped in an Arc before being passed to
41 //! the executor. The executor holds the Arc and ensures all operations are complete before dropping
42 //! it, that guarantees the memory is valid for the duration.
43 //!
44 //! The buffers _have_ to be on the heap. Because we don't have a way to cancel a future if it is
45 //! dropped(can't rely on drop running), there is no way to ensure the kernel's buffer remains valid
46 //! until the operation completes unless the executor holds an Arc to the memory on the heap.
47 //!
48 //! ## Using `Vec` for reads/writes.
49 //!
50 //! There is a convenience wrapper `VecIoWrapper` provided for fully owned vectors. This type
51 //! ensures that only the kernel is allowed to access the `Vec` and wraps the the `Vec` in an Arc to
52 //! ensure it lives long enough.
53
54 use std::convert::TryInto;
55 use std::ffi::CStr;
56 use std::fs::File;
57 use std::future::Future;
58 use std::io;
59 use std::mem;
60 use std::mem::MaybeUninit;
61 use std::os::unix::io::FromRawFd;
62 use std::os::unix::io::RawFd;
63 use std::pin::Pin;
64 use std::sync::atomic::AtomicI32;
65 use std::sync::atomic::Ordering;
66 use std::sync::Arc;
67 use std::sync::Weak;
68 use std::task::Context;
69 use std::task::Poll;
70 use std::task::Waker;
71 use std::thread;
72 use std::thread::ThreadId;
73
74 use async_task::Task;
75 use base::trace;
76 use base::warn;
77 use base::AsRawDescriptor;
78 use base::EventType;
79 use base::RawDescriptor;
80 use futures::task::noop_waker;
81 use io_uring::URingAllowlist;
82 use io_uring::URingContext;
83 use io_uring::URingOperation;
84 use once_cell::sync::Lazy;
85 use pin_utils::pin_mut;
86 use remain::sorted;
87 use slab::Slab;
88 use sync::Mutex;
89 use thiserror::Error as ThisError;
90
91 use crate::mem::BackingMemory;
92 use crate::mem::MemRegion;
93 use crate::queue::RunnableQueue;
94 use crate::waker::new_waker;
95 use crate::waker::WakerToken;
96 use crate::waker::WeakWake;
97 use crate::BlockingPool;
98 use crate::DetachedTasks;
99
100 #[sorted]
101 #[derive(Debug, ThisError)]
102 pub enum Error {
103 /// Creating a context to wait on FDs failed.
104 #[error("Error creating the fd waiting context: {0}")]
105 CreatingContext(io_uring::Error),
106 /// Failed to copy the FD for the polling context.
107 #[error("Failed to copy the FD for the polling context: {0}")]
108 DuplicatingFd(base::Error),
109 /// Enabling a context faild.
110 #[error("Error enabling the URing context: {0}")]
111 EnablingContext(io_uring::Error),
112 /// The Executor is gone.
113 #[error("The URingExecutor is gone")]
114 ExecutorGone,
115 /// Invalid offset or length given for an iovec in backing memory.
116 #[error("Invalid offset/len for getting an iovec")]
117 InvalidOffset,
118 /// Invalid FD source specified.
119 #[error("Invalid source, FD not registered for use")]
120 InvalidSource,
121 /// Error doing the IO.
122 #[error("Error during IO: {0}")]
123 Io(io::Error),
124 /// Registering operation restrictions to a uring failed.
125 #[error("Error registering restrictions to the URing context: {0}")]
126 RegisteringURingRestriction(io_uring::Error),
127 /// Failed to remove the waker remove the polling context.
128 #[error("Error removing from the URing context: {0}")]
129 RemovingWaker(io_uring::Error),
130 /// Failed to submit the operation to the polling context.
131 #[error("Error adding to the URing context: {0}")]
132 SubmittingOp(io_uring::Error),
133 /// URingContext failure.
134 #[error("URingContext failure: {0}")]
135 URingContextError(io_uring::Error),
136 /// Failed to submit or wait for io_uring events.
137 #[error("URing::enter: {0}")]
138 URingEnter(io_uring::Error),
139 }
140 pub type Result<T> = std::result::Result<T, Error>;
141
142 impl From<Error> for io::Error {
from(e: Error) -> Self143 fn from(e: Error) -> Self {
144 use Error::*;
145 match e {
146 DuplicatingFd(e) => e.into(),
147 ExecutorGone => io::Error::new(io::ErrorKind::Other, ExecutorGone),
148 InvalidOffset => io::Error::new(io::ErrorKind::InvalidInput, InvalidOffset),
149 InvalidSource => io::Error::new(io::ErrorKind::InvalidData, InvalidSource),
150 Io(e) => e,
151 CreatingContext(e) => e.into(),
152 RemovingWaker(e) => e.into(),
153 SubmittingOp(e) => e.into(),
154 URingContextError(e) => e.into(),
155 URingEnter(e) => e.into(),
156 EnablingContext(e) => e.into(),
157 RegisteringURingRestriction(e) => e.into(),
158 }
159 }
160 }
161
162 static IS_URING_STABLE: Lazy<bool> = Lazy::new(|| {
163 let mut utsname = MaybeUninit::zeroed();
164
165 // Safe because this will only modify `utsname` and we check the return value.
166 let res = unsafe { libc::uname(utsname.as_mut_ptr()) };
167 if res < 0 {
168 return false;
169 }
170
171 // Safe because the kernel has initialized `utsname`.
172 let utsname = unsafe { utsname.assume_init() };
173
174 // Safe because the pointer is valid and the kernel guarantees that this is a valid C string.
175 let release = unsafe { CStr::from_ptr(utsname.release.as_ptr()) };
176
177 let mut components = match release.to_str().map(|r| r.split('.').map(str::parse)) {
178 Ok(c) => c,
179 Err(_) => return false,
180 };
181
182 // Kernels older than 5.10 either didn't support io_uring or had bugs in the implementation.
183 match (components.next(), components.next()) {
184 (Some(Ok(major)), Some(Ok(minor))) if (major, minor) >= (5, 10) => {
185 // The kernel version is new enough so check if we can actually make a uring context.
186 URingContext::new(8, None).is_ok()
187 }
188 _ => false,
189 }
190 });
191
192 // Checks if the uring executor is stable.
193 // Caches the result so that the check is only run once.
194 // Useful for falling back to the FD executor on pre-uring kernels.
is_uring_stable() -> bool195 pub(crate) fn is_uring_stable() -> bool {
196 *IS_URING_STABLE
197 }
198
199 // Checks the uring availability by checking if the uring creation succeeds.
200 // If uring creation succeeds, it returns `Ok(())`. It returns an `URingContextError` otherwise.
201 // It fails if the kernel does not support io_uring, but note that the cause is not limited to it.
check_uring_availability() -> Result<()>202 pub(crate) fn check_uring_availability() -> Result<()> {
203 URingContext::new(8, None)
204 .map(drop)
205 .map_err(Error::URingContextError)
206 }
207
208 pub struct RegisteredSource {
209 tag: usize,
210 ex: Weak<RawExecutor>,
211 }
212
213 impl RegisteredSource {
start_read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: &[MemRegion], ) -> Result<PendingOperation>214 pub fn start_read_to_mem(
215 &self,
216 file_offset: Option<u64>,
217 mem: Arc<dyn BackingMemory + Send + Sync>,
218 addrs: &[MemRegion],
219 ) -> Result<PendingOperation> {
220 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
221 let token = ex.submit_read_to_vectored(self, mem, file_offset, addrs)?;
222
223 Ok(PendingOperation {
224 waker_token: Some(token),
225 ex: self.ex.clone(),
226 submitted: false,
227 })
228 }
229
start_write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: &[MemRegion], ) -> Result<PendingOperation>230 pub fn start_write_from_mem(
231 &self,
232 file_offset: Option<u64>,
233 mem: Arc<dyn BackingMemory + Send + Sync>,
234 addrs: &[MemRegion],
235 ) -> Result<PendingOperation> {
236 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
237 let token = ex.submit_write_from_vectored(self, mem, file_offset, addrs)?;
238
239 Ok(PendingOperation {
240 waker_token: Some(token),
241 ex: self.ex.clone(),
242 submitted: false,
243 })
244 }
245
start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation>246 pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {
247 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
248 let token = ex.submit_fallocate(self, offset, len, mode)?;
249
250 Ok(PendingOperation {
251 waker_token: Some(token),
252 ex: self.ex.clone(),
253 submitted: false,
254 })
255 }
256
start_fsync(&self) -> Result<PendingOperation>257 pub fn start_fsync(&self) -> Result<PendingOperation> {
258 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
259 let token = ex.submit_fsync(self)?;
260
261 Ok(PendingOperation {
262 waker_token: Some(token),
263 ex: self.ex.clone(),
264 submitted: false,
265 })
266 }
267
poll_fd_readable(&self) -> Result<PendingOperation>268 pub fn poll_fd_readable(&self) -> Result<PendingOperation> {
269 let events = EventType::Read;
270
271 let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
272 let token = ex.submit_poll(self, events)?;
273
274 Ok(PendingOperation {
275 waker_token: Some(token),
276 ex: self.ex.clone(),
277 submitted: false,
278 })
279 }
280 }
281
282 impl Drop for RegisteredSource {
drop(&mut self)283 fn drop(&mut self) {
284 if let Some(ex) = self.ex.upgrade() {
285 ex.deregister_source(self);
286 }
287 }
288 }
289
290 // Indicates that the executor is either within or about to make an io_uring_enter syscall. When a
291 // waker sees this value, it will add and submit a NOP to the uring, which will wake up the thread
292 // blocked on the io_uring_enter syscall.
293 const WAITING: i32 = 0xb80d_21b5u32 as i32;
294
295 // Indicates that the executor is processing any futures that are ready to run.
296 const PROCESSING: i32 = 0xdb31_83a3u32 as i32;
297
298 // Indicates that one or more futures may be ready to make progress.
299 const WOKEN: i32 = 0x0fc7_8f7eu32 as i32;
300
301 // Number of entries in the ring.
302 const NUM_ENTRIES: usize = 256;
303
304 // An operation that has been submitted to the uring and is potentially being waited on.
305 struct OpData {
306 _file: Arc<File>,
307 _mem: Option<Arc<dyn BackingMemory + Send + Sync>>,
308 waker: Option<Waker>,
309 canceled: bool,
310 }
311
312 // The current status of an operation that's been submitted to the uring.
313 enum OpStatus {
314 Nop,
315 Pending(OpData),
316 Completed(Option<::std::io::Result<u32>>),
317 }
318
319 struct Ring {
320 ops: Slab<OpStatus>,
321 registered_sources: Slab<Arc<File>>,
322 }
323
324 struct RawExecutor {
325 // The URingContext needs to be first so that it is dropped first, closing the uring fd, and
326 // releasing the resources borrowed by the kernel before we free them.
327 ctx: URingContext,
328 queue: RunnableQueue,
329 ring: Mutex<Ring>,
330 blocking_pool: BlockingPool,
331 thread_id: Mutex<Option<ThreadId>>,
332 state: AtomicI32,
333 detached_tasks: Mutex<DetachedTasks>,
334 }
335
336 impl RawExecutor {
new() -> Result<RawExecutor>337 fn new() -> Result<RawExecutor> {
338 // Allow operations only that the RawExecutor really submits to enhance the security.
339 let mut restrictions = URingAllowlist::new();
340 let ops = [
341 URingOperation::Writev,
342 URingOperation::Readv,
343 URingOperation::Nop,
344 URingOperation::Fsync,
345 URingOperation::Fallocate,
346 URingOperation::PollAdd,
347 URingOperation::PollRemove,
348 URingOperation::AsyncCancel,
349 ];
350 for op in ops {
351 restrictions.allow_submit_operation(op);
352 }
353
354 let ctx =
355 URingContext::new(NUM_ENTRIES, Some(&restrictions)).map_err(Error::CreatingContext)?;
356
357 Ok(RawExecutor {
358 ctx,
359 queue: RunnableQueue::new(),
360 ring: Mutex::new(Ring {
361 ops: Slab::with_capacity(NUM_ENTRIES),
362 registered_sources: Slab::with_capacity(NUM_ENTRIES),
363 }),
364 blocking_pool: Default::default(),
365 thread_id: Mutex::new(None),
366 state: AtomicI32::new(PROCESSING),
367 detached_tasks: Mutex::new(DetachedTasks::new()),
368 })
369 }
370
wake(&self)371 fn wake(&self) {
372 let oldstate = self.state.swap(WOKEN, Ordering::Release);
373 if oldstate == WAITING {
374 let mut ring = self.ring.lock();
375 let entry = ring.ops.vacant_entry();
376 let next_op_token = entry.key();
377 if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {
378 warn!("Failed to add NOP for waking up executor: {}", e);
379 }
380 entry.insert(OpStatus::Nop);
381 mem::drop(ring);
382
383 match self.ctx.submit() {
384 Ok(()) => {}
385 // If the kernel's submit ring is full then we know we won't block when calling
386 // io_uring_enter, which is all we really care about.
387 Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
388 Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),
389 }
390 }
391 }
392
spawn<F>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,393 fn spawn<F>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<F::Output>
394 where
395 F: Future + Send + 'static,
396 F::Output: Send + 'static,
397 {
398 let raw = Arc::downgrade(self);
399 let schedule = move |runnable| {
400 if let Some(r) = raw.upgrade() {
401 r.queue.push_back(runnable);
402 r.wake();
403 }
404 };
405 let (runnable, task) = async_task::spawn(f, schedule);
406 runnable.schedule();
407 UringExecutorTaskHandle {
408 task,
409 raw: Arc::downgrade(self),
410 }
411 }
412
spawn_local<F>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,413 fn spawn_local<F>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<F::Output>
414 where
415 F: Future + 'static,
416 F::Output: 'static,
417 {
418 let raw = Arc::downgrade(self);
419 let schedule = move |runnable| {
420 if let Some(r) = raw.upgrade() {
421 r.queue.push_back(runnable);
422 r.wake();
423 }
424 };
425 let (runnable, task) = async_task::spawn_local(f, schedule);
426 runnable.schedule();
427 UringExecutorTaskHandle {
428 task,
429 raw: Arc::downgrade(self),
430 }
431 }
432
spawn_blocking<F, R>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,433 fn spawn_blocking<F, R>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<R>
434 where
435 F: FnOnce() -> R + Send + 'static,
436 R: Send + 'static,
437 {
438 self.spawn(self.blocking_pool.spawn(f))
439 }
440
runs_tasks_on_current_thread(&self) -> bool441 fn runs_tasks_on_current_thread(&self) -> bool {
442 let executor_thread = self.thread_id.lock();
443 executor_thread
444 .map(|id| id == thread::current().id())
445 .unwrap_or(false)
446 }
447
run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output>448 fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
449 let current_thread = thread::current().id();
450 let mut thread_id = self.thread_id.lock();
451 assert_eq!(
452 *thread_id.get_or_insert(current_thread),
453 current_thread,
454 "`URingExecutor::run` cannot be called from more than one thread"
455 );
456 mem::drop(thread_id);
457
458 pin_mut!(done);
459 loop {
460 self.state.store(PROCESSING, Ordering::Release);
461 for runnable in self.queue.iter() {
462 runnable.run();
463 }
464
465 if let Ok(mut tasks) = self.detached_tasks.try_lock() {
466 tasks.poll(cx);
467 }
468
469 if let Poll::Ready(val) = done.as_mut().poll(cx) {
470 return Ok(val);
471 }
472
473 let oldstate = self.state.compare_exchange(
474 PROCESSING,
475 WAITING,
476 Ordering::Acquire,
477 Ordering::Acquire,
478 );
479 if let Err(oldstate) = oldstate {
480 debug_assert_eq!(oldstate, WOKEN);
481 // One or more futures have become runnable.
482 continue;
483 }
484
485 trace!(
486 "Waiting on events, {} pending ops",
487 self.ring.lock().ops.len()
488 );
489 let events = self.ctx.wait().map_err(Error::URingEnter)?;
490
491 // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
492 // writing to the eventfd.
493 self.state.store(PROCESSING, Ordering::Release);
494
495 let mut ring = self.ring.lock();
496 for (raw_token, result) in events {
497 // While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
498 // something that we originally gave to the kernel and that was created from a
499 // `usize` so we should always be able to convert it back into a `usize`.
500 let token = raw_token
501 .try_into()
502 .expect("`u64` doesn't fit inside a `usize`");
503
504 let op = ring
505 .ops
506 .get_mut(token)
507 .expect("Received completion token for unexpected operation");
508 match mem::replace(op, OpStatus::Completed(Some(result))) {
509 // No one is waiting on a Nop.
510 OpStatus::Nop => mem::drop(ring.ops.remove(token)),
511 OpStatus::Pending(data) => {
512 if data.canceled {
513 // No one is waiting for this operation and the uring is done with
514 // it so it's safe to remove.
515 ring.ops.remove(token);
516 }
517 if let Some(waker) = data.waker {
518 waker.wake();
519 }
520 }
521 OpStatus::Completed(_) => panic!("uring operation completed more than once"),
522 }
523 }
524 }
525 }
526
get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>>527 fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {
528 let mut ring = self.ring.lock();
529
530 let op = ring
531 .ops
532 .get_mut(token.0)
533 .expect("`get_result` called on unknown operation");
534 match op {
535 OpStatus::Nop => panic!("`get_result` called on nop"),
536 OpStatus::Pending(data) => {
537 if data.canceled {
538 panic!("`get_result` called on canceled operation");
539 }
540 data.waker = Some(cx.waker().clone());
541 None
542 }
543 OpStatus::Completed(res) => {
544 let out = res.take();
545 ring.ops.remove(token.0);
546 Some(out.expect("Missing result in completed operation"))
547 }
548 }
549 }
550
551 // Remove the waker for the given token if it hasn't fired yet.
cancel_operation(&self, token: WakerToken)552 fn cancel_operation(&self, token: WakerToken) {
553 let mut ring = self.ring.lock();
554 let submit_cancel = if let Some(op) = ring.ops.get_mut(token.0) {
555 match op {
556 OpStatus::Nop => panic!("`cancel_operation` called on nop"),
557 OpStatus::Pending(data) => {
558 if data.canceled {
559 panic!("uring operation canceled more than once");
560 }
561
562 if let Some(waker) = data.waker.take() {
563 waker.wake();
564 }
565 // Clear the waker as it is no longer needed.
566 data.waker = None;
567 data.canceled = true;
568
569 // Keep the rest of the op data as the uring might still be accessing either
570 // the source of the backing memory so it needs to live until the kernel
571 // completes the operation.
572 true
573 }
574 OpStatus::Completed(_) => {
575 ring.ops.remove(token.0);
576 false
577 }
578 }
579 } else {
580 false
581 };
582 std::mem::drop(ring);
583 if submit_cancel {
584 let _best_effort = self.submit_cancel_async(token.0);
585 }
586 }
587
register_source(&self, f: Arc<File>) -> usize588 fn register_source(&self, f: Arc<File>) -> usize {
589 self.ring.lock().registered_sources.insert(f)
590 }
591
deregister_source(&self, source: &RegisteredSource)592 fn deregister_source(&self, source: &RegisteredSource) {
593 // There isn't any need to pull pending ops out, the all have Arc's to the file and mem they
594 // need.let them complete. deregister with pending ops is not a common path no need to
595 // optimize that case yet.
596 self.ring.lock().registered_sources.remove(source.tag);
597 }
598
submit_poll( &self, source: &RegisteredSource, events: base::EventType, ) -> Result<WakerToken>599 fn submit_poll(
600 &self,
601 source: &RegisteredSource,
602 events: base::EventType,
603 ) -> Result<WakerToken> {
604 let mut ring = self.ring.lock();
605 let src = ring
606 .registered_sources
607 .get(source.tag)
608 .map(Arc::clone)
609 .ok_or(Error::InvalidSource)?;
610 let entry = ring.ops.vacant_entry();
611 let next_op_token = entry.key();
612 self.ctx
613 .add_poll_fd(src.as_raw_descriptor(), events, usize_to_u64(next_op_token))
614 .map_err(Error::SubmittingOp)?;
615 entry.insert(OpStatus::Pending(OpData {
616 _file: src,
617 _mem: None,
618 waker: None,
619 canceled: false,
620 }));
621
622 Ok(WakerToken(next_op_token))
623 }
624
submit_fallocate( &self, source: &RegisteredSource, offset: u64, len: u64, mode: u32, ) -> Result<WakerToken>625 fn submit_fallocate(
626 &self,
627 source: &RegisteredSource,
628 offset: u64,
629 len: u64,
630 mode: u32,
631 ) -> Result<WakerToken> {
632 let mut ring = self.ring.lock();
633 let src = ring
634 .registered_sources
635 .get(source.tag)
636 .map(Arc::clone)
637 .ok_or(Error::InvalidSource)?;
638 let entry = ring.ops.vacant_entry();
639 let next_op_token = entry.key();
640 self.ctx
641 .add_fallocate(
642 src.as_raw_descriptor(),
643 offset,
644 len,
645 mode,
646 usize_to_u64(next_op_token),
647 )
648 .map_err(Error::SubmittingOp)?;
649
650 entry.insert(OpStatus::Pending(OpData {
651 _file: src,
652 _mem: None,
653 waker: None,
654 canceled: false,
655 }));
656
657 Ok(WakerToken(next_op_token))
658 }
659
submit_cancel_async(&self, token: usize) -> Result<WakerToken>660 fn submit_cancel_async(&self, token: usize) -> Result<WakerToken> {
661 let mut ring = self.ring.lock();
662 let entry = ring.ops.vacant_entry();
663 let next_op_token = entry.key();
664 self.ctx
665 .async_cancel(usize_to_u64(token), usize_to_u64(next_op_token))
666 .map_err(Error::SubmittingOp)?;
667
668 entry.insert(OpStatus::Nop);
669
670 Ok(WakerToken(next_op_token))
671 }
672
submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken>673 fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
674 let mut ring = self.ring.lock();
675 let src = ring
676 .registered_sources
677 .get(source.tag)
678 .map(Arc::clone)
679 .ok_or(Error::InvalidSource)?;
680 let entry = ring.ops.vacant_entry();
681 let next_op_token = entry.key();
682 self.ctx
683 .add_fsync(src.as_raw_descriptor(), usize_to_u64(next_op_token))
684 .map_err(Error::SubmittingOp)?;
685 entry.insert(OpStatus::Pending(OpData {
686 _file: src,
687 _mem: None,
688 waker: None,
689 canceled: false,
690 }));
691
692 Ok(WakerToken(next_op_token))
693 }
694
submit_read_to_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: Option<u64>, addrs: &[MemRegion], ) -> Result<WakerToken>695 fn submit_read_to_vectored(
696 &self,
697 source: &RegisteredSource,
698 mem: Arc<dyn BackingMemory + Send + Sync>,
699 offset: Option<u64>,
700 addrs: &[MemRegion],
701 ) -> Result<WakerToken> {
702 if addrs
703 .iter()
704 .any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
705 {
706 return Err(Error::InvalidOffset);
707 }
708
709 let mut ring = self.ring.lock();
710 let src = ring
711 .registered_sources
712 .get(source.tag)
713 .map(Arc::clone)
714 .ok_or(Error::InvalidSource)?;
715
716 // We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
717 let entry = ring.ops.vacant_entry();
718 let next_op_token = entry.key();
719
720 // The addresses have already been validated, so unwrapping them will succeed.
721 // validate their addresses before submitting.
722 let iovecs = addrs.iter().map(|&mem_range| {
723 *mem.get_volatile_slice(mem_range)
724 .unwrap()
725 .as_iobuf()
726 .as_ref()
727 });
728
729 unsafe {
730 // Safe because all the addresses are within the Memory that an Arc is kept for the
731 // duration to ensure the memory is valid while the kernel accesses it.
732 // Tested by `dont_drop_backing_mem_read` unit test.
733 self.ctx
734 .add_readv_iter(
735 iovecs,
736 src.as_raw_descriptor(),
737 offset,
738 usize_to_u64(next_op_token),
739 )
740 .map_err(Error::SubmittingOp)?;
741 }
742
743 entry.insert(OpStatus::Pending(OpData {
744 _file: src,
745 _mem: Some(mem),
746 waker: None,
747 canceled: false,
748 }));
749
750 Ok(WakerToken(next_op_token))
751 }
752
submit_write_from_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: Option<u64>, addrs: &[MemRegion], ) -> Result<WakerToken>753 fn submit_write_from_vectored(
754 &self,
755 source: &RegisteredSource,
756 mem: Arc<dyn BackingMemory + Send + Sync>,
757 offset: Option<u64>,
758 addrs: &[MemRegion],
759 ) -> Result<WakerToken> {
760 if addrs
761 .iter()
762 .any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
763 {
764 return Err(Error::InvalidOffset);
765 }
766
767 let mut ring = self.ring.lock();
768 let src = ring
769 .registered_sources
770 .get(source.tag)
771 .map(Arc::clone)
772 .ok_or(Error::InvalidSource)?;
773
774 // We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
775 let entry = ring.ops.vacant_entry();
776 let next_op_token = entry.key();
777
778 // The addresses have already been validated, so unwrapping them will succeed.
779 // validate their addresses before submitting.
780 let iovecs = addrs.iter().map(|&mem_range| {
781 *mem.get_volatile_slice(mem_range)
782 .unwrap()
783 .as_iobuf()
784 .as_ref()
785 });
786
787 unsafe {
788 // Safe because all the addresses are within the Memory that an Arc is kept for the
789 // duration to ensure the memory is valid while the kernel accesses it.
790 // Tested by `dont_drop_backing_mem_write` unit test.
791 self.ctx
792 .add_writev_iter(
793 iovecs,
794 src.as_raw_descriptor(),
795 offset,
796 usize_to_u64(next_op_token),
797 )
798 .map_err(Error::SubmittingOp)?;
799 }
800
801 entry.insert(OpStatus::Pending(OpData {
802 _file: src,
803 _mem: Some(mem),
804 waker: None,
805 canceled: false,
806 }));
807
808 Ok(WakerToken(next_op_token))
809 }
810 }
811
812 impl AsRawDescriptor for RawExecutor {
as_raw_descriptor(&self) -> RawDescriptor813 fn as_raw_descriptor(&self) -> RawDescriptor {
814 self.ctx.as_raw_descriptor()
815 }
816 }
817
818 impl WeakWake for RawExecutor {
wake_by_ref(weak_self: &Weak<Self>)819 fn wake_by_ref(weak_self: &Weak<Self>) {
820 if let Some(arc_self) = weak_self.upgrade() {
821 RawExecutor::wake(&arc_self);
822 }
823 }
824 }
825
826 impl Drop for RawExecutor {
drop(&mut self)827 fn drop(&mut self) {
828 // Submit cancellations for all operations
829 #[allow(clippy::needless_collect)]
830 let ops: Vec<_> = self
831 .ring
832 .get_mut()
833 .ops
834 .iter_mut()
835 .filter_map(|op| match op.1 {
836 OpStatus::Pending(data) if !data.canceled => Some(op.0),
837 _ => None,
838 })
839 .collect();
840 for token in ops {
841 self.cancel_operation(WakerToken(token));
842 }
843
844 // Since the RawExecutor is wrapped in an Arc it may end up being dropped from a different
845 // thread than the one that called `run` or `run_until`. Since we know there are no other
846 // references, just clear the thread id so that we don't panic.
847 *self.thread_id.lock() = None;
848
849 // Now run the executor loop once more to poll any futures we just woke up.
850 let waker = noop_waker();
851 let mut cx = Context::from_waker(&waker);
852 let res = self.run(
853 &mut cx,
854 // make sure all pending uring operations are completed as kernel may
855 // try to write to memory that we may drop
856 futures::future::poll_fn(|_cx| {
857 if self.ring.lock().ops.is_empty() {
858 Poll::Ready(())
859 } else {
860 Poll::Pending
861 }
862 }),
863 );
864
865 if let Err(e) = res {
866 warn!("Failed to drive uring to completion: {}", e);
867 }
868 }
869 }
870
871 pub struct UringExecutorTaskHandle<R> {
872 task: Task<R>,
873 raw: Weak<RawExecutor>,
874 }
875
876 impl<R: Send + 'static> UringExecutorTaskHandle<R> {
detach(self)877 pub fn detach(self) {
878 if let Some(raw) = self.raw.upgrade() {
879 raw.detached_tasks.lock().push(self.task);
880 }
881 }
882 }
883
884 impl<R: 'static> Future for UringExecutorTaskHandle<R> {
885 type Output = R;
886
poll( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context, ) -> std::task::Poll<Self::Output>887 fn poll(
888 mut self: std::pin::Pin<&mut Self>,
889 cx: &mut std::task::Context,
890 ) -> std::task::Poll<Self::Output> {
891 Pin::new(&mut self.task).poll(cx)
892 }
893 }
894
895 /// An executor that uses io_uring for its asynchronous I/O operations. See the documentation of
896 /// `Executor` for more details about the methods.
897 #[derive(Clone)]
898 pub struct URingExecutor {
899 raw: Arc<RawExecutor>,
900 }
901
902 impl URingExecutor {
new() -> Result<URingExecutor>903 pub fn new() -> Result<URingExecutor> {
904 let raw = RawExecutor::new().map(Arc::new)?;
905
906 Ok(URingExecutor { raw })
907 }
908
spawn<F>(&self, f: F) -> UringExecutorTaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,909 pub fn spawn<F>(&self, f: F) -> UringExecutorTaskHandle<F::Output>
910 where
911 F: Future + Send + 'static,
912 F::Output: Send + 'static,
913 {
914 self.raw.spawn(f)
915 }
916
spawn_local<F>(&self, f: F) -> UringExecutorTaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,917 pub fn spawn_local<F>(&self, f: F) -> UringExecutorTaskHandle<F::Output>
918 where
919 F: Future + 'static,
920 F::Output: 'static,
921 {
922 self.raw.spawn_local(f)
923 }
924
spawn_blocking<F, R>(&self, f: F) -> UringExecutorTaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,925 pub fn spawn_blocking<F, R>(&self, f: F) -> UringExecutorTaskHandle<R>
926 where
927 F: FnOnce() -> R + Send + 'static,
928 R: Send + 'static,
929 {
930 self.raw.spawn_blocking(f)
931 }
932
run_until<F: Future>(&self, f: F) -> Result<F::Output>933 pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
934 let waker = new_waker(Arc::downgrade(&self.raw));
935 let mut ctx = Context::from_waker(&waker);
936 self.raw.run(&mut ctx, f)
937 }
938
939 /// Register a file and memory pair for buffered asynchronous operation.
register_source<F: AsRawDescriptor>(&self, fd: &F) -> Result<RegisteredSource>940 pub(crate) fn register_source<F: AsRawDescriptor>(&self, fd: &F) -> Result<RegisteredSource> {
941 let duped_fd = unsafe {
942 // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
943 // will only be added to the poll loop.
944 File::from_raw_fd(dup_fd(fd.as_raw_descriptor())?)
945 };
946
947 Ok(RegisteredSource {
948 tag: self.raw.register_source(Arc::new(duped_fd)),
949 ex: Arc::downgrade(&self.raw),
950 })
951 }
952 }
953
954 impl AsRawDescriptor for URingExecutor {
as_raw_descriptor(&self) -> RawDescriptor955 fn as_raw_descriptor(&self) -> RawDescriptor {
956 self.raw.as_raw_descriptor()
957 }
958 }
959
960 // Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while
961 // waiting in TLS to be added to the main polling context.
dup_fd(fd: RawFd) -> Result<RawFd>962 unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
963 let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
964 if ret < 0 {
965 Err(Error::DuplicatingFd(base::Error::last()))
966 } else {
967 Ok(ret)
968 }
969 }
970
971 // Converts a `usize` into a `u64` and panics if the conversion fails.
972 #[inline]
usize_to_u64(val: usize) -> u64973 fn usize_to_u64(val: usize) -> u64 {
974 val.try_into().expect("`usize` doesn't fit inside a `u64`")
975 }
976
977 pub struct PendingOperation {
978 waker_token: Option<WakerToken>,
979 ex: Weak<RawExecutor>,
980 submitted: bool,
981 }
982
983 impl Future for PendingOperation {
984 type Output = Result<u32>;
985
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>986 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
987 let token = self
988 .waker_token
989 .as_ref()
990 .expect("PendingOperation polled after returning Poll::Ready");
991 if let Some(ex) = self.ex.upgrade() {
992 if let Some(result) = ex.get_result(token, cx) {
993 self.waker_token = None;
994 Poll::Ready(result.map_err(Error::Io))
995 } else {
996 // If we haven't submitted the operation yet, and the executor runs on a different
997 // thread then submit it now. Otherwise the executor will submit it automatically
998 // the next time it calls UringContext::wait.
999 if !self.submitted && !ex.runs_tasks_on_current_thread() {
1000 match ex.ctx.submit() {
1001 Ok(()) => self.submitted = true,
1002 // If the kernel ring is full then wait until some ops are removed from the
1003 // completion queue. This op should get submitted the next time the executor
1004 // calls UringContext::wait.
1005 Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
1006 Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
1007 }
1008 }
1009 Poll::Pending
1010 }
1011 } else {
1012 Poll::Ready(Err(Error::ExecutorGone))
1013 }
1014 }
1015 }
1016
1017 impl Drop for PendingOperation {
drop(&mut self)1018 fn drop(&mut self) {
1019 if let Some(waker_token) = self.waker_token.take() {
1020 if let Some(ex) = self.ex.upgrade() {
1021 ex.cancel_operation(waker_token);
1022 }
1023 }
1024 }
1025 }
1026
1027 #[cfg(test)]
1028 mod tests {
1029 use std::future::Future;
1030 use std::io::Read;
1031 use std::io::Write;
1032 use std::mem;
1033 use std::pin::Pin;
1034 use std::rc::Rc;
1035 use std::task::Context;
1036 use std::task::Poll;
1037
1038 use futures::executor::block_on;
1039
1040 use super::*;
1041 use crate::mem::BackingMemory;
1042 use crate::mem::MemRegion;
1043 use crate::mem::VecIoWrapper;
1044
1045 // A future that returns ready when the uring queue is empty.
1046 struct UringQueueEmpty<'a> {
1047 ex: &'a URingExecutor,
1048 }
1049
1050 impl<'a> Future for UringQueueEmpty<'a> {
1051 type Output = ();
1052
poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output>1053 fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
1054 if self.ex.raw.ring.lock().ops.is_empty() {
1055 Poll::Ready(())
1056 } else {
1057 Poll::Pending
1058 }
1059 }
1060 }
1061
1062 #[test]
dont_drop_backing_mem_read()1063 fn dont_drop_backing_mem_read() {
1064 if !is_uring_stable() {
1065 return;
1066 }
1067
1068 // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
1069 // op is pending.
1070 let bm =
1071 Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
1072
1073 // Use pipes to create a future that will block forever.
1074 let (rx, mut tx) = base::pipe(true).unwrap();
1075
1076 // Set up the TLS for the uring_executor by creating one.
1077 let ex = URingExecutor::new().unwrap();
1078
1079 // Register the receive side of the pipe with the executor.
1080 let registered_source = ex.register_source(&rx).expect("register source failed");
1081
1082 // Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
1083 // of the op.
1084 let pending_op = registered_source
1085 .start_read_to_mem(None, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
1086 .expect("failed to start read to mem");
1087
1088 // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
1089 // reference while the op is active.
1090 assert_eq!(Arc::strong_count(&bm), 2);
1091
1092 // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
1093 // it.
1094 drop(pending_op);
1095 assert_eq!(Arc::strong_count(&bm), 2);
1096
1097 // Finishing the operation should put the Arc count back to 1.
1098 // write to the pipe to wake the read pipe and then wait for the uring result in the
1099 // executor.
1100 tx.write_all(&[0u8; 8]).expect("write failed");
1101 ex.run_until(UringQueueEmpty { ex: &ex })
1102 .expect("Failed to wait for read pipe ready");
1103 assert_eq!(Arc::strong_count(&bm), 1);
1104 }
1105
1106 #[test]
dont_drop_backing_mem_write()1107 fn dont_drop_backing_mem_write() {
1108 if !is_uring_stable() {
1109 return;
1110 }
1111
1112 // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
1113 // op is pending.
1114 let bm =
1115 Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
1116
1117 // Use pipes to create a future that will block forever.
1118 let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed");
1119
1120 // Set up the TLS for the uring_executor by creating one.
1121 let ex = URingExecutor::new().unwrap();
1122
1123 // Register the receive side of the pipe with the executor.
1124 let registered_source = ex.register_source(&tx).expect("register source failed");
1125
1126 // Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
1127 // of the op.
1128 let pending_op = registered_source
1129 .start_write_from_mem(None, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
1130 .expect("failed to start write to mem");
1131
1132 // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
1133 // reference while the op is active.
1134 assert_eq!(Arc::strong_count(&bm), 2);
1135
1136 // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
1137 // it.
1138 drop(pending_op);
1139 assert_eq!(Arc::strong_count(&bm), 2);
1140
1141 // Finishing the operation should put the Arc count back to 1.
1142 // write to the pipe to wake the read pipe and then wait for the uring result in the
1143 // executor.
1144 let mut buf = vec![0u8; base::round_up_to_page_size(1)];
1145 rx.read_exact(&mut buf).expect("read to empty failed");
1146 ex.run_until(UringQueueEmpty { ex: &ex })
1147 .expect("Failed to wait for write pipe ready");
1148 assert_eq!(Arc::strong_count(&bm), 1);
1149 }
1150
1151 #[test]
canceled_before_completion()1152 fn canceled_before_completion() {
1153 if !is_uring_stable() {
1154 return;
1155 }
1156
1157 async fn cancel_io(op: PendingOperation) {
1158 mem::drop(op);
1159 }
1160
1161 async fn check_result(op: PendingOperation, expected: u32) {
1162 let actual = op.await.expect("operation failed to complete");
1163 assert_eq!(expected, actual);
1164 }
1165
1166 let bm =
1167 Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1168
1169 let (rx, tx) = base::pipe(true).expect("Pipe failed");
1170
1171 let ex = URingExecutor::new().unwrap();
1172
1173 let rx_source = ex.register_source(&rx).expect("register source failed");
1174 let tx_source = ex.register_source(&tx).expect("register source failed");
1175
1176 let read_task = rx_source
1177 .start_read_to_mem(None, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
1178 .expect("failed to start read to mem");
1179
1180 ex.spawn_local(cancel_io(read_task)).detach();
1181
1182 // Write to the pipe so that the kernel operation will complete.
1183 let buf =
1184 Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1185 let write_task = tx_source
1186 .start_write_from_mem(None, Arc::clone(&buf), &[MemRegion { offset: 0, len: 8 }])
1187 .expect("failed to start write from mem");
1188
1189 ex.run_until(check_result(write_task, 8))
1190 .expect("Failed to run executor");
1191 }
1192
1193 // We will drain all ops on drop and its not guaranteed that operation won't finish
1194 #[ignore]
1195 #[test]
drop_before_completion()1196 fn drop_before_completion() {
1197 if !is_uring_stable() {
1198 return;
1199 }
1200
1201 const VALUE: u64 = 0xef6c_a8df_b842_eb9c;
1202
1203 async fn check_op(op: PendingOperation) {
1204 let err = op.await.expect_err("Op completed successfully");
1205 match err {
1206 Error::ExecutorGone => {}
1207 e => panic!("Unexpected error from op: {}", e),
1208 }
1209 }
1210
1211 let (mut rx, mut tx) = base::pipe(true).expect("Pipe failed");
1212
1213 let ex = URingExecutor::new().unwrap();
1214
1215 let tx_source = ex.register_source(&tx).expect("Failed to register source");
1216 let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));
1217 let op = tx_source
1218 .start_write_from_mem(
1219 None,
1220 bm,
1221 &[MemRegion {
1222 offset: 0,
1223 len: mem::size_of::<u64>(),
1224 }],
1225 )
1226 .expect("Failed to start write from mem");
1227
1228 ex.spawn_local(check_op(op)).detach();
1229
1230 // Now drop the executor. It shouldn't run the write operation.
1231 mem::drop(ex);
1232
1233 // Make sure the executor did not complete the uring operation.
1234 let new_val = [0x2e; 8];
1235 tx.write_all(&new_val).unwrap();
1236
1237 let mut buf = 0u64.to_ne_bytes();
1238 rx.read_exact(&mut buf[..])
1239 .expect("Failed to read from pipe");
1240
1241 assert_eq!(buf, new_val);
1242 }
1243
1244 // Dropping a task that owns a BlockingPool shouldn't leak the pool.
1245 #[test]
drop_detached_blocking_pool()1246 fn drop_detached_blocking_pool() {
1247 if !is_uring_stable() {
1248 return;
1249 }
1250
1251 struct Cleanup(BlockingPool);
1252
1253 impl Drop for Cleanup {
1254 fn drop(&mut self) {
1255 // Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).
1256 self.0
1257 .shutdown(Some(
1258 std::time::Instant::now() + std::time::Duration::from_secs(1),
1259 ))
1260 .unwrap();
1261 }
1262 }
1263
1264 let rc = Rc::new(std::cell::Cell::new(0));
1265 {
1266 let ex = URingExecutor::new().unwrap();
1267 let rc_clone = rc.clone();
1268 ex.spawn_local(async move {
1269 rc_clone.set(1);
1270 let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
1271 let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
1272 // Spawn a blocking task.
1273 let blocking_task = pool.0.spawn(move || {
1274 // Rendezvous.
1275 assert_eq!(recv.recv(), Ok(()));
1276 // Wait for drop.
1277 assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
1278 });
1279 // Make sure it has actually started (using a "rendezvous channel" send).
1280 //
1281 // Without this step, we'll have a race where we can shutdown the blocking pool
1282 // before the worker thread pops off the task.
1283 send.send(()).unwrap();
1284 // Wait for it to finish
1285 blocking_task.await;
1286 rc_clone.set(2);
1287 })
1288 .detach();
1289 ex.run_until(async {}).unwrap();
1290 // `ex` is dropped here. If everything is working as expected, it should drop all of
1291 // its tasks, including `send` and `pool` (in that order, which is important). `pool`'s
1292 // `Drop` impl will try to join all the worker threads, which should work because send
1293 // half of the channel closed.
1294 }
1295 assert_eq!(rc.get(), 1);
1296 Rc::try_unwrap(rc).expect("Rc had too many refs");
1297 }
1298
1299 #[test]
drop_on_different_thread()1300 fn drop_on_different_thread() {
1301 if !is_uring_stable() {
1302 return;
1303 }
1304
1305 let ex = URingExecutor::new().unwrap();
1306
1307 let ex2 = ex.clone();
1308 let t = thread::spawn(move || ex2.run_until(async {}));
1309
1310 t.join().unwrap().unwrap();
1311
1312 // Leave an uncompleted operation in the queue so that the drop impl will try to drive it to
1313 // completion.
1314 let (_rx, tx) = base::pipe(true).expect("Pipe failed");
1315 let tx = ex.register_source(&tx).expect("Failed to register source");
1316 let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
1317 let op = tx
1318 .start_write_from_mem(
1319 None,
1320 bm,
1321 &[MemRegion {
1322 offset: 0,
1323 len: mem::size_of::<u64>(),
1324 }],
1325 )
1326 .expect("Failed to start write from mem");
1327
1328 mem::drop(ex);
1329
1330 match block_on(op).expect_err("Pending operation completed after executor was dropped") {
1331 Error::ExecutorGone => {}
1332 e => panic!("Unexpected error after dropping executor: {}", e),
1333 }
1334 }
1335 }
1336