• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The ChromiumOS Authors
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 //! `URingExecutor`
6 //!
7 //! The executor runs all given futures to completion. Futures register wakers associated with
8 //! io_uring operations. A waker is called when the set of uring ops the waker is waiting on
9 //! completes.
10 //!
11 //! `URingExecutor` is meant to be used with the `futures-rs` crate that provides combinators and
12 //! utility functions to combine futures. In general, the interface provided by `URingExecutor`
13 //! shouldn't be used directly. Instead, use them by interacting with implementors of `IoSource`,
14 //! and the high-level future functions.
15 //!
16 //!
17 //! ## Read/Write buffer management.
18 //!
19 //! There are two key issues managing asynchronous IO buffers in rust.
20 //! 1) The kernel has a mutable reference to the memory until the completion is returned. Rust must
21 //! not have any references to it during that time.
22 //! 2) The memory must remain valid as long as the kernel has a reference to it.
23 //!
24 //! ### The kernel's mutable borrow of the buffer
25 //!
26 //! Because the buffers used for read and write must be passed to the kernel for an unknown
27 //! duration, the functions must maintain ownership of the memory.  The core of this problem is that
28 //! the lifetime of the future isn't tied to the scope in which the kernel can modify the buffer the
29 //! future has a reference to.  The buffer can be modified at any point from submission until the
30 //! operation completes. The operation can't be synchronously canceled when the future is dropped,
31 //! and Drop can't be used for safety guarantees. To ensure this never happens, only memory that
32 //! implements `BackingMemory` is accepted.  For implementors of `BackingMemory` the mut borrow
33 //! isn't an issue because those are already Ok with external modifications to the memory (Like a
34 //! `VolatileSlice`).
35 //!
36 //! ### Buffer lifetime
37 //!
38 //! What if the kernel's reference to the buffer outlives the buffer itself?  This could happen if a
39 //! read operation was submitted, then the memory is dropped.  To solve this, the executor takes an
40 //! Arc to the backing memory. Vecs being read to are also wrapped in an Arc before being passed to
41 //! the executor.  The executor holds the Arc and ensures all operations are complete before dropping
42 //! it, that guarantees the memory is valid for the duration.
43 //!
44 //! The buffers _have_ to be on the heap. Because we don't have a way to cancel a future if it is
45 //! dropped(can't rely on drop running), there is no way to ensure the kernel's buffer remains valid
46 //! until the operation completes unless the executor holds an Arc to the memory on the heap.
47 //!
48 //! ## Using `Vec` for reads/writes.
49 //!
50 //! There is a convenience wrapper `VecIoWrapper` provided for fully owned vectors. This type
51 //! ensures that only the kernel is allowed to access the `Vec` and wraps the the `Vec` in an Arc to
52 //! ensure it lives long enough.
53 
54 use std::convert::TryInto;
55 use std::ffi::CStr;
56 use std::fs::File;
57 use std::future::Future;
58 use std::io;
59 use std::mem;
60 use std::mem::MaybeUninit;
61 use std::os::unix::io::FromRawFd;
62 use std::os::unix::io::RawFd;
63 use std::pin::Pin;
64 use std::sync::atomic::AtomicI32;
65 use std::sync::atomic::Ordering;
66 use std::sync::Arc;
67 use std::sync::Weak;
68 use std::task::Context;
69 use std::task::Poll;
70 use std::task::Waker;
71 use std::thread;
72 use std::thread::ThreadId;
73 
74 use async_task::Task;
75 use base::trace;
76 use base::warn;
77 use base::AsRawDescriptor;
78 use base::EventType;
79 use base::RawDescriptor;
80 use futures::task::noop_waker;
81 use io_uring::URingAllowlist;
82 use io_uring::URingContext;
83 use io_uring::URingOperation;
84 use once_cell::sync::Lazy;
85 use pin_utils::pin_mut;
86 use remain::sorted;
87 use slab::Slab;
88 use sync::Mutex;
89 use thiserror::Error as ThisError;
90 
91 use crate::mem::BackingMemory;
92 use crate::mem::MemRegion;
93 use crate::queue::RunnableQueue;
94 use crate::waker::new_waker;
95 use crate::waker::WakerToken;
96 use crate::waker::WeakWake;
97 use crate::BlockingPool;
98 use crate::DetachedTasks;
99 
100 #[sorted]
101 #[derive(Debug, ThisError)]
102 pub enum Error {
103     /// Creating a context to wait on FDs failed.
104     #[error("Error creating the fd waiting context: {0}")]
105     CreatingContext(io_uring::Error),
106     /// Failed to copy the FD for the polling context.
107     #[error("Failed to copy the FD for the polling context: {0}")]
108     DuplicatingFd(base::Error),
109     /// Enabling a context faild.
110     #[error("Error enabling the URing context: {0}")]
111     EnablingContext(io_uring::Error),
112     /// The Executor is gone.
113     #[error("The URingExecutor is gone")]
114     ExecutorGone,
115     /// Invalid offset or length given for an iovec in backing memory.
116     #[error("Invalid offset/len for getting an iovec")]
117     InvalidOffset,
118     /// Invalid FD source specified.
119     #[error("Invalid source, FD not registered for use")]
120     InvalidSource,
121     /// Error doing the IO.
122     #[error("Error during IO: {0}")]
123     Io(io::Error),
124     /// Registering operation restrictions to a uring failed.
125     #[error("Error registering restrictions to the URing context: {0}")]
126     RegisteringURingRestriction(io_uring::Error),
127     /// Failed to remove the waker remove the polling context.
128     #[error("Error removing from the URing context: {0}")]
129     RemovingWaker(io_uring::Error),
130     /// Failed to submit the operation to the polling context.
131     #[error("Error adding to the URing context: {0}")]
132     SubmittingOp(io_uring::Error),
133     /// URingContext failure.
134     #[error("URingContext failure: {0}")]
135     URingContextError(io_uring::Error),
136     /// Failed to submit or wait for io_uring events.
137     #[error("URing::enter: {0}")]
138     URingEnter(io_uring::Error),
139 }
140 pub type Result<T> = std::result::Result<T, Error>;
141 
142 impl From<Error> for io::Error {
from(e: Error) -> Self143     fn from(e: Error) -> Self {
144         use Error::*;
145         match e {
146             DuplicatingFd(e) => e.into(),
147             ExecutorGone => io::Error::new(io::ErrorKind::Other, ExecutorGone),
148             InvalidOffset => io::Error::new(io::ErrorKind::InvalidInput, InvalidOffset),
149             InvalidSource => io::Error::new(io::ErrorKind::InvalidData, InvalidSource),
150             Io(e) => e,
151             CreatingContext(e) => e.into(),
152             RemovingWaker(e) => e.into(),
153             SubmittingOp(e) => e.into(),
154             URingContextError(e) => e.into(),
155             URingEnter(e) => e.into(),
156             EnablingContext(e) => e.into(),
157             RegisteringURingRestriction(e) => e.into(),
158         }
159     }
160 }
161 
162 static IS_URING_STABLE: Lazy<bool> = Lazy::new(|| {
163     let mut utsname = MaybeUninit::zeroed();
164 
165     // Safe because this will only modify `utsname` and we check the return value.
166     let res = unsafe { libc::uname(utsname.as_mut_ptr()) };
167     if res < 0 {
168         return false;
169     }
170 
171     // Safe because the kernel has initialized `utsname`.
172     let utsname = unsafe { utsname.assume_init() };
173 
174     // Safe because the pointer is valid and the kernel guarantees that this is a valid C string.
175     let release = unsafe { CStr::from_ptr(utsname.release.as_ptr()) };
176 
177     let mut components = match release.to_str().map(|r| r.split('.').map(str::parse)) {
178         Ok(c) => c,
179         Err(_) => return false,
180     };
181 
182     // Kernels older than 5.10 either didn't support io_uring or had bugs in the implementation.
183     match (components.next(), components.next()) {
184         (Some(Ok(major)), Some(Ok(minor))) if (major, minor) >= (5, 10) => {
185             // The kernel version is new enough so check if we can actually make a uring context.
186             URingContext::new(8, None).is_ok()
187         }
188         _ => false,
189     }
190 });
191 
192 // Checks if the uring executor is stable.
193 // Caches the result so that the check is only run once.
194 // Useful for falling back to the FD executor on pre-uring kernels.
is_uring_stable() -> bool195 pub(crate) fn is_uring_stable() -> bool {
196     *IS_URING_STABLE
197 }
198 
199 // Checks the uring availability by checking if the uring creation succeeds.
200 // If uring creation succeeds, it returns `Ok(())`. It returns an `URingContextError` otherwise.
201 // It fails if the kernel does not support io_uring, but note that the cause is not limited to it.
check_uring_availability() -> Result<()>202 pub(crate) fn check_uring_availability() -> Result<()> {
203     URingContext::new(8, None)
204         .map(drop)
205         .map_err(Error::URingContextError)
206 }
207 
208 pub struct RegisteredSource {
209     tag: usize,
210     ex: Weak<RawExecutor>,
211 }
212 
213 impl RegisteredSource {
start_read_to_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: &[MemRegion], ) -> Result<PendingOperation>214     pub fn start_read_to_mem(
215         &self,
216         file_offset: Option<u64>,
217         mem: Arc<dyn BackingMemory + Send + Sync>,
218         addrs: &[MemRegion],
219     ) -> Result<PendingOperation> {
220         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
221         let token = ex.submit_read_to_vectored(self, mem, file_offset, addrs)?;
222 
223         Ok(PendingOperation {
224             waker_token: Some(token),
225             ex: self.ex.clone(),
226             submitted: false,
227         })
228     }
229 
start_write_from_mem( &self, file_offset: Option<u64>, mem: Arc<dyn BackingMemory + Send + Sync>, addrs: &[MemRegion], ) -> Result<PendingOperation>230     pub fn start_write_from_mem(
231         &self,
232         file_offset: Option<u64>,
233         mem: Arc<dyn BackingMemory + Send + Sync>,
234         addrs: &[MemRegion],
235     ) -> Result<PendingOperation> {
236         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
237         let token = ex.submit_write_from_vectored(self, mem, file_offset, addrs)?;
238 
239         Ok(PendingOperation {
240             waker_token: Some(token),
241             ex: self.ex.clone(),
242             submitted: false,
243         })
244     }
245 
start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation>246     pub fn start_fallocate(&self, offset: u64, len: u64, mode: u32) -> Result<PendingOperation> {
247         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
248         let token = ex.submit_fallocate(self, offset, len, mode)?;
249 
250         Ok(PendingOperation {
251             waker_token: Some(token),
252             ex: self.ex.clone(),
253             submitted: false,
254         })
255     }
256 
start_fsync(&self) -> Result<PendingOperation>257     pub fn start_fsync(&self) -> Result<PendingOperation> {
258         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
259         let token = ex.submit_fsync(self)?;
260 
261         Ok(PendingOperation {
262             waker_token: Some(token),
263             ex: self.ex.clone(),
264             submitted: false,
265         })
266     }
267 
poll_fd_readable(&self) -> Result<PendingOperation>268     pub fn poll_fd_readable(&self) -> Result<PendingOperation> {
269         let events = EventType::Read;
270 
271         let ex = self.ex.upgrade().ok_or(Error::ExecutorGone)?;
272         let token = ex.submit_poll(self, events)?;
273 
274         Ok(PendingOperation {
275             waker_token: Some(token),
276             ex: self.ex.clone(),
277             submitted: false,
278         })
279     }
280 }
281 
282 impl Drop for RegisteredSource {
drop(&mut self)283     fn drop(&mut self) {
284         if let Some(ex) = self.ex.upgrade() {
285             ex.deregister_source(self);
286         }
287     }
288 }
289 
290 // Indicates that the executor is either within or about to make an io_uring_enter syscall. When a
291 // waker sees this value, it will add and submit a NOP to the uring, which will wake up the thread
292 // blocked on the io_uring_enter syscall.
293 const WAITING: i32 = 0xb80d_21b5u32 as i32;
294 
295 // Indicates that the executor is processing any futures that are ready to run.
296 const PROCESSING: i32 = 0xdb31_83a3u32 as i32;
297 
298 // Indicates that one or more futures may be ready to make progress.
299 const WOKEN: i32 = 0x0fc7_8f7eu32 as i32;
300 
301 // Number of entries in the ring.
302 const NUM_ENTRIES: usize = 256;
303 
304 // An operation that has been submitted to the uring and is potentially being waited on.
305 struct OpData {
306     _file: Arc<File>,
307     _mem: Option<Arc<dyn BackingMemory + Send + Sync>>,
308     waker: Option<Waker>,
309     canceled: bool,
310 }
311 
312 // The current status of an operation that's been submitted to the uring.
313 enum OpStatus {
314     Nop,
315     Pending(OpData),
316     Completed(Option<::std::io::Result<u32>>),
317 }
318 
319 struct Ring {
320     ops: Slab<OpStatus>,
321     registered_sources: Slab<Arc<File>>,
322 }
323 
324 struct RawExecutor {
325     // The URingContext needs to be first so that it is dropped first, closing the uring fd, and
326     // releasing the resources borrowed by the kernel before we free them.
327     ctx: URingContext,
328     queue: RunnableQueue,
329     ring: Mutex<Ring>,
330     blocking_pool: BlockingPool,
331     thread_id: Mutex<Option<ThreadId>>,
332     state: AtomicI32,
333     detached_tasks: Mutex<DetachedTasks>,
334 }
335 
336 impl RawExecutor {
new() -> Result<RawExecutor>337     fn new() -> Result<RawExecutor> {
338         // Allow operations only that the RawExecutor really submits to enhance the security.
339         let mut restrictions = URingAllowlist::new();
340         let ops = [
341             URingOperation::Writev,
342             URingOperation::Readv,
343             URingOperation::Nop,
344             URingOperation::Fsync,
345             URingOperation::Fallocate,
346             URingOperation::PollAdd,
347             URingOperation::PollRemove,
348             URingOperation::AsyncCancel,
349         ];
350         for op in ops {
351             restrictions.allow_submit_operation(op);
352         }
353 
354         let ctx =
355             URingContext::new(NUM_ENTRIES, Some(&restrictions)).map_err(Error::CreatingContext)?;
356 
357         Ok(RawExecutor {
358             ctx,
359             queue: RunnableQueue::new(),
360             ring: Mutex::new(Ring {
361                 ops: Slab::with_capacity(NUM_ENTRIES),
362                 registered_sources: Slab::with_capacity(NUM_ENTRIES),
363             }),
364             blocking_pool: Default::default(),
365             thread_id: Mutex::new(None),
366             state: AtomicI32::new(PROCESSING),
367             detached_tasks: Mutex::new(DetachedTasks::new()),
368         })
369     }
370 
wake(&self)371     fn wake(&self) {
372         let oldstate = self.state.swap(WOKEN, Ordering::Release);
373         if oldstate == WAITING {
374             let mut ring = self.ring.lock();
375             let entry = ring.ops.vacant_entry();
376             let next_op_token = entry.key();
377             if let Err(e) = self.ctx.add_nop(usize_to_u64(next_op_token)) {
378                 warn!("Failed to add NOP for waking up executor: {}", e);
379             }
380             entry.insert(OpStatus::Nop);
381             mem::drop(ring);
382 
383             match self.ctx.submit() {
384                 Ok(()) => {}
385                 // If the kernel's submit ring is full then we know we won't block when calling
386                 // io_uring_enter, which is all we really care about.
387                 Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
388                 Err(e) => warn!("Failed to submit NOP for waking up executor: {}", e),
389             }
390         }
391     }
392 
spawn<F>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,393     fn spawn<F>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<F::Output>
394     where
395         F: Future + Send + 'static,
396         F::Output: Send + 'static,
397     {
398         let raw = Arc::downgrade(self);
399         let schedule = move |runnable| {
400             if let Some(r) = raw.upgrade() {
401                 r.queue.push_back(runnable);
402                 r.wake();
403             }
404         };
405         let (runnable, task) = async_task::spawn(f, schedule);
406         runnable.schedule();
407         UringExecutorTaskHandle {
408             task,
409             raw: Arc::downgrade(self),
410         }
411     }
412 
spawn_local<F>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,413     fn spawn_local<F>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<F::Output>
414     where
415         F: Future + 'static,
416         F::Output: 'static,
417     {
418         let raw = Arc::downgrade(self);
419         let schedule = move |runnable| {
420             if let Some(r) = raw.upgrade() {
421                 r.queue.push_back(runnable);
422                 r.wake();
423             }
424         };
425         let (runnable, task) = async_task::spawn_local(f, schedule);
426         runnable.schedule();
427         UringExecutorTaskHandle {
428             task,
429             raw: Arc::downgrade(self),
430         }
431     }
432 
spawn_blocking<F, R>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,433     fn spawn_blocking<F, R>(self: &Arc<Self>, f: F) -> UringExecutorTaskHandle<R>
434     where
435         F: FnOnce() -> R + Send + 'static,
436         R: Send + 'static,
437     {
438         self.spawn(self.blocking_pool.spawn(f))
439     }
440 
runs_tasks_on_current_thread(&self) -> bool441     fn runs_tasks_on_current_thread(&self) -> bool {
442         let executor_thread = self.thread_id.lock();
443         executor_thread
444             .map(|id| id == thread::current().id())
445             .unwrap_or(false)
446     }
447 
run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output>448     fn run<F: Future>(&self, cx: &mut Context, done: F) -> Result<F::Output> {
449         let current_thread = thread::current().id();
450         let mut thread_id = self.thread_id.lock();
451         assert_eq!(
452             *thread_id.get_or_insert(current_thread),
453             current_thread,
454             "`URingExecutor::run` cannot be called from more than one thread"
455         );
456         mem::drop(thread_id);
457 
458         pin_mut!(done);
459         loop {
460             self.state.store(PROCESSING, Ordering::Release);
461             for runnable in self.queue.iter() {
462                 runnable.run();
463             }
464 
465             if let Ok(mut tasks) = self.detached_tasks.try_lock() {
466                 tasks.poll(cx);
467             }
468 
469             if let Poll::Ready(val) = done.as_mut().poll(cx) {
470                 return Ok(val);
471             }
472 
473             let oldstate = self.state.compare_exchange(
474                 PROCESSING,
475                 WAITING,
476                 Ordering::Acquire,
477                 Ordering::Acquire,
478             );
479             if let Err(oldstate) = oldstate {
480                 debug_assert_eq!(oldstate, WOKEN);
481                 // One or more futures have become runnable.
482                 continue;
483             }
484 
485             trace!(
486                 "Waiting on events, {} pending ops",
487                 self.ring.lock().ops.len()
488             );
489             let events = self.ctx.wait().map_err(Error::URingEnter)?;
490 
491             // Set the state back to PROCESSING to prevent any tasks woken up by the loop below from
492             // writing to the eventfd.
493             self.state.store(PROCESSING, Ordering::Release);
494 
495             let mut ring = self.ring.lock();
496             for (raw_token, result) in events {
497                 // While the `expect()` might fail on arbitrary `u64`s, the `raw_token` was
498                 // something that we originally gave to the kernel and that was created from a
499                 // `usize` so we should always be able to convert it back into a `usize`.
500                 let token = raw_token
501                     .try_into()
502                     .expect("`u64` doesn't fit inside a `usize`");
503 
504                 let op = ring
505                     .ops
506                     .get_mut(token)
507                     .expect("Received completion token for unexpected operation");
508                 match mem::replace(op, OpStatus::Completed(Some(result))) {
509                     // No one is waiting on a Nop.
510                     OpStatus::Nop => mem::drop(ring.ops.remove(token)),
511                     OpStatus::Pending(data) => {
512                         if data.canceled {
513                             // No one is waiting for this operation and the uring is done with
514                             // it so it's safe to remove.
515                             ring.ops.remove(token);
516                         }
517                         if let Some(waker) = data.waker {
518                             waker.wake();
519                         }
520                     }
521                     OpStatus::Completed(_) => panic!("uring operation completed more than once"),
522                 }
523             }
524         }
525     }
526 
get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>>527     fn get_result(&self, token: &WakerToken, cx: &mut Context) -> Option<io::Result<u32>> {
528         let mut ring = self.ring.lock();
529 
530         let op = ring
531             .ops
532             .get_mut(token.0)
533             .expect("`get_result` called on unknown operation");
534         match op {
535             OpStatus::Nop => panic!("`get_result` called on nop"),
536             OpStatus::Pending(data) => {
537                 if data.canceled {
538                     panic!("`get_result` called on canceled operation");
539                 }
540                 data.waker = Some(cx.waker().clone());
541                 None
542             }
543             OpStatus::Completed(res) => {
544                 let out = res.take();
545                 ring.ops.remove(token.0);
546                 Some(out.expect("Missing result in completed operation"))
547             }
548         }
549     }
550 
551     // Remove the waker for the given token if it hasn't fired yet.
cancel_operation(&self, token: WakerToken)552     fn cancel_operation(&self, token: WakerToken) {
553         let mut ring = self.ring.lock();
554         let submit_cancel = if let Some(op) = ring.ops.get_mut(token.0) {
555             match op {
556                 OpStatus::Nop => panic!("`cancel_operation` called on nop"),
557                 OpStatus::Pending(data) => {
558                     if data.canceled {
559                         panic!("uring operation canceled more than once");
560                     }
561 
562                     if let Some(waker) = data.waker.take() {
563                         waker.wake();
564                     }
565                     // Clear the waker as it is no longer needed.
566                     data.waker = None;
567                     data.canceled = true;
568 
569                     // Keep the rest of the op data as the uring might still be accessing either
570                     // the source of the backing memory so it needs to live until the kernel
571                     // completes the operation.
572                     true
573                 }
574                 OpStatus::Completed(_) => {
575                     ring.ops.remove(token.0);
576                     false
577                 }
578             }
579         } else {
580             false
581         };
582         std::mem::drop(ring);
583         if submit_cancel {
584             let _best_effort = self.submit_cancel_async(token.0);
585         }
586     }
587 
register_source(&self, f: Arc<File>) -> usize588     fn register_source(&self, f: Arc<File>) -> usize {
589         self.ring.lock().registered_sources.insert(f)
590     }
591 
deregister_source(&self, source: &RegisteredSource)592     fn deregister_source(&self, source: &RegisteredSource) {
593         // There isn't any need to pull pending ops out, the all have Arc's to the file and mem they
594         // need.let them complete. deregister with pending ops is not a common path no need to
595         // optimize that case yet.
596         self.ring.lock().registered_sources.remove(source.tag);
597     }
598 
submit_poll( &self, source: &RegisteredSource, events: base::EventType, ) -> Result<WakerToken>599     fn submit_poll(
600         &self,
601         source: &RegisteredSource,
602         events: base::EventType,
603     ) -> Result<WakerToken> {
604         let mut ring = self.ring.lock();
605         let src = ring
606             .registered_sources
607             .get(source.tag)
608             .map(Arc::clone)
609             .ok_or(Error::InvalidSource)?;
610         let entry = ring.ops.vacant_entry();
611         let next_op_token = entry.key();
612         self.ctx
613             .add_poll_fd(src.as_raw_descriptor(), events, usize_to_u64(next_op_token))
614             .map_err(Error::SubmittingOp)?;
615         entry.insert(OpStatus::Pending(OpData {
616             _file: src,
617             _mem: None,
618             waker: None,
619             canceled: false,
620         }));
621 
622         Ok(WakerToken(next_op_token))
623     }
624 
submit_fallocate( &self, source: &RegisteredSource, offset: u64, len: u64, mode: u32, ) -> Result<WakerToken>625     fn submit_fallocate(
626         &self,
627         source: &RegisteredSource,
628         offset: u64,
629         len: u64,
630         mode: u32,
631     ) -> Result<WakerToken> {
632         let mut ring = self.ring.lock();
633         let src = ring
634             .registered_sources
635             .get(source.tag)
636             .map(Arc::clone)
637             .ok_or(Error::InvalidSource)?;
638         let entry = ring.ops.vacant_entry();
639         let next_op_token = entry.key();
640         self.ctx
641             .add_fallocate(
642                 src.as_raw_descriptor(),
643                 offset,
644                 len,
645                 mode,
646                 usize_to_u64(next_op_token),
647             )
648             .map_err(Error::SubmittingOp)?;
649 
650         entry.insert(OpStatus::Pending(OpData {
651             _file: src,
652             _mem: None,
653             waker: None,
654             canceled: false,
655         }));
656 
657         Ok(WakerToken(next_op_token))
658     }
659 
submit_cancel_async(&self, token: usize) -> Result<WakerToken>660     fn submit_cancel_async(&self, token: usize) -> Result<WakerToken> {
661         let mut ring = self.ring.lock();
662         let entry = ring.ops.vacant_entry();
663         let next_op_token = entry.key();
664         self.ctx
665             .async_cancel(usize_to_u64(token), usize_to_u64(next_op_token))
666             .map_err(Error::SubmittingOp)?;
667 
668         entry.insert(OpStatus::Nop);
669 
670         Ok(WakerToken(next_op_token))
671     }
672 
submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken>673     fn submit_fsync(&self, source: &RegisteredSource) -> Result<WakerToken> {
674         let mut ring = self.ring.lock();
675         let src = ring
676             .registered_sources
677             .get(source.tag)
678             .map(Arc::clone)
679             .ok_or(Error::InvalidSource)?;
680         let entry = ring.ops.vacant_entry();
681         let next_op_token = entry.key();
682         self.ctx
683             .add_fsync(src.as_raw_descriptor(), usize_to_u64(next_op_token))
684             .map_err(Error::SubmittingOp)?;
685         entry.insert(OpStatus::Pending(OpData {
686             _file: src,
687             _mem: None,
688             waker: None,
689             canceled: false,
690         }));
691 
692         Ok(WakerToken(next_op_token))
693     }
694 
submit_read_to_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: Option<u64>, addrs: &[MemRegion], ) -> Result<WakerToken>695     fn submit_read_to_vectored(
696         &self,
697         source: &RegisteredSource,
698         mem: Arc<dyn BackingMemory + Send + Sync>,
699         offset: Option<u64>,
700         addrs: &[MemRegion],
701     ) -> Result<WakerToken> {
702         if addrs
703             .iter()
704             .any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
705         {
706             return Err(Error::InvalidOffset);
707         }
708 
709         let mut ring = self.ring.lock();
710         let src = ring
711             .registered_sources
712             .get(source.tag)
713             .map(Arc::clone)
714             .ok_or(Error::InvalidSource)?;
715 
716         // We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
717         let entry = ring.ops.vacant_entry();
718         let next_op_token = entry.key();
719 
720         // The addresses have already been validated, so unwrapping them will succeed.
721         // validate their addresses before submitting.
722         let iovecs = addrs.iter().map(|&mem_range| {
723             *mem.get_volatile_slice(mem_range)
724                 .unwrap()
725                 .as_iobuf()
726                 .as_ref()
727         });
728 
729         unsafe {
730             // Safe because all the addresses are within the Memory that an Arc is kept for the
731             // duration to ensure the memory is valid while the kernel accesses it.
732             // Tested by `dont_drop_backing_mem_read` unit test.
733             self.ctx
734                 .add_readv_iter(
735                     iovecs,
736                     src.as_raw_descriptor(),
737                     offset,
738                     usize_to_u64(next_op_token),
739                 )
740                 .map_err(Error::SubmittingOp)?;
741         }
742 
743         entry.insert(OpStatus::Pending(OpData {
744             _file: src,
745             _mem: Some(mem),
746             waker: None,
747             canceled: false,
748         }));
749 
750         Ok(WakerToken(next_op_token))
751     }
752 
submit_write_from_vectored( &self, source: &RegisteredSource, mem: Arc<dyn BackingMemory + Send + Sync>, offset: Option<u64>, addrs: &[MemRegion], ) -> Result<WakerToken>753     fn submit_write_from_vectored(
754         &self,
755         source: &RegisteredSource,
756         mem: Arc<dyn BackingMemory + Send + Sync>,
757         offset: Option<u64>,
758         addrs: &[MemRegion],
759     ) -> Result<WakerToken> {
760         if addrs
761             .iter()
762             .any(|&mem_range| mem.get_volatile_slice(mem_range).is_err())
763         {
764             return Err(Error::InvalidOffset);
765         }
766 
767         let mut ring = self.ring.lock();
768         let src = ring
769             .registered_sources
770             .get(source.tag)
771             .map(Arc::clone)
772             .ok_or(Error::InvalidSource)?;
773 
774         // We can't insert the OpData into the slab yet because `iovecs` borrows `mem` below.
775         let entry = ring.ops.vacant_entry();
776         let next_op_token = entry.key();
777 
778         // The addresses have already been validated, so unwrapping them will succeed.
779         // validate their addresses before submitting.
780         let iovecs = addrs.iter().map(|&mem_range| {
781             *mem.get_volatile_slice(mem_range)
782                 .unwrap()
783                 .as_iobuf()
784                 .as_ref()
785         });
786 
787         unsafe {
788             // Safe because all the addresses are within the Memory that an Arc is kept for the
789             // duration to ensure the memory is valid while the kernel accesses it.
790             // Tested by `dont_drop_backing_mem_write` unit test.
791             self.ctx
792                 .add_writev_iter(
793                     iovecs,
794                     src.as_raw_descriptor(),
795                     offset,
796                     usize_to_u64(next_op_token),
797                 )
798                 .map_err(Error::SubmittingOp)?;
799         }
800 
801         entry.insert(OpStatus::Pending(OpData {
802             _file: src,
803             _mem: Some(mem),
804             waker: None,
805             canceled: false,
806         }));
807 
808         Ok(WakerToken(next_op_token))
809     }
810 }
811 
812 impl AsRawDescriptor for RawExecutor {
as_raw_descriptor(&self) -> RawDescriptor813     fn as_raw_descriptor(&self) -> RawDescriptor {
814         self.ctx.as_raw_descriptor()
815     }
816 }
817 
818 impl WeakWake for RawExecutor {
wake_by_ref(weak_self: &Weak<Self>)819     fn wake_by_ref(weak_self: &Weak<Self>) {
820         if let Some(arc_self) = weak_self.upgrade() {
821             RawExecutor::wake(&arc_self);
822         }
823     }
824 }
825 
826 impl Drop for RawExecutor {
drop(&mut self)827     fn drop(&mut self) {
828         // Submit cancellations for all operations
829         #[allow(clippy::needless_collect)]
830         let ops: Vec<_> = self
831             .ring
832             .get_mut()
833             .ops
834             .iter_mut()
835             .filter_map(|op| match op.1 {
836                 OpStatus::Pending(data) if !data.canceled => Some(op.0),
837                 _ => None,
838             })
839             .collect();
840         for token in ops {
841             self.cancel_operation(WakerToken(token));
842         }
843 
844         // Since the RawExecutor is wrapped in an Arc it may end up being dropped from a different
845         // thread than the one that called `run` or `run_until`. Since we know there are no other
846         // references, just clear the thread id so that we don't panic.
847         *self.thread_id.lock() = None;
848 
849         // Now run the executor loop once more to poll any futures we just woke up.
850         let waker = noop_waker();
851         let mut cx = Context::from_waker(&waker);
852         let res = self.run(
853             &mut cx,
854             // make sure all pending uring operations are completed as kernel may
855             // try to write to memory that we may drop
856             futures::future::poll_fn(|_cx| {
857                 if self.ring.lock().ops.is_empty() {
858                     Poll::Ready(())
859                 } else {
860                     Poll::Pending
861                 }
862             }),
863         );
864 
865         if let Err(e) = res {
866             warn!("Failed to drive uring to completion: {}", e);
867         }
868     }
869 }
870 
871 pub struct UringExecutorTaskHandle<R> {
872     task: Task<R>,
873     raw: Weak<RawExecutor>,
874 }
875 
876 impl<R: Send + 'static> UringExecutorTaskHandle<R> {
detach(self)877     pub fn detach(self) {
878         if let Some(raw) = self.raw.upgrade() {
879             raw.detached_tasks.lock().push(self.task);
880         }
881     }
882 }
883 
884 impl<R: 'static> Future for UringExecutorTaskHandle<R> {
885     type Output = R;
886 
poll( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context, ) -> std::task::Poll<Self::Output>887     fn poll(
888         mut self: std::pin::Pin<&mut Self>,
889         cx: &mut std::task::Context,
890     ) -> std::task::Poll<Self::Output> {
891         Pin::new(&mut self.task).poll(cx)
892     }
893 }
894 
895 /// An executor that uses io_uring for its asynchronous I/O operations. See the documentation of
896 /// `Executor` for more details about the methods.
897 #[derive(Clone)]
898 pub struct URingExecutor {
899     raw: Arc<RawExecutor>,
900 }
901 
902 impl URingExecutor {
new() -> Result<URingExecutor>903     pub fn new() -> Result<URingExecutor> {
904         let raw = RawExecutor::new().map(Arc::new)?;
905 
906         Ok(URingExecutor { raw })
907     }
908 
spawn<F>(&self, f: F) -> UringExecutorTaskHandle<F::Output> where F: Future + Send + 'static, F::Output: Send + 'static,909     pub fn spawn<F>(&self, f: F) -> UringExecutorTaskHandle<F::Output>
910     where
911         F: Future + Send + 'static,
912         F::Output: Send + 'static,
913     {
914         self.raw.spawn(f)
915     }
916 
spawn_local<F>(&self, f: F) -> UringExecutorTaskHandle<F::Output> where F: Future + 'static, F::Output: 'static,917     pub fn spawn_local<F>(&self, f: F) -> UringExecutorTaskHandle<F::Output>
918     where
919         F: Future + 'static,
920         F::Output: 'static,
921     {
922         self.raw.spawn_local(f)
923     }
924 
spawn_blocking<F, R>(&self, f: F) -> UringExecutorTaskHandle<R> where F: FnOnce() -> R + Send + 'static, R: Send + 'static,925     pub fn spawn_blocking<F, R>(&self, f: F) -> UringExecutorTaskHandle<R>
926     where
927         F: FnOnce() -> R + Send + 'static,
928         R: Send + 'static,
929     {
930         self.raw.spawn_blocking(f)
931     }
932 
run_until<F: Future>(&self, f: F) -> Result<F::Output>933     pub fn run_until<F: Future>(&self, f: F) -> Result<F::Output> {
934         let waker = new_waker(Arc::downgrade(&self.raw));
935         let mut ctx = Context::from_waker(&waker);
936         self.raw.run(&mut ctx, f)
937     }
938 
939     /// Register a file and memory pair for buffered asynchronous operation.
register_source<F: AsRawDescriptor>(&self, fd: &F) -> Result<RegisteredSource>940     pub(crate) fn register_source<F: AsRawDescriptor>(&self, fd: &F) -> Result<RegisteredSource> {
941         let duped_fd = unsafe {
942             // Safe because duplicating an FD doesn't affect memory safety, and the dup'd FD
943             // will only be added to the poll loop.
944             File::from_raw_fd(dup_fd(fd.as_raw_descriptor())?)
945         };
946 
947         Ok(RegisteredSource {
948             tag: self.raw.register_source(Arc::new(duped_fd)),
949             ex: Arc::downgrade(&self.raw),
950         })
951     }
952 }
953 
954 impl AsRawDescriptor for URingExecutor {
as_raw_descriptor(&self) -> RawDescriptor955     fn as_raw_descriptor(&self) -> RawDescriptor {
956         self.raw.as_raw_descriptor()
957     }
958 }
959 
960 // Used to dup the FDs passed to the executor so there is a guarantee they aren't closed while
961 // waiting in TLS to be added to the main polling context.
dup_fd(fd: RawFd) -> Result<RawFd>962 unsafe fn dup_fd(fd: RawFd) -> Result<RawFd> {
963     let ret = libc::fcntl(fd, libc::F_DUPFD_CLOEXEC, 0);
964     if ret < 0 {
965         Err(Error::DuplicatingFd(base::Error::last()))
966     } else {
967         Ok(ret)
968     }
969 }
970 
971 // Converts a `usize` into a `u64` and panics if the conversion fails.
972 #[inline]
usize_to_u64(val: usize) -> u64973 fn usize_to_u64(val: usize) -> u64 {
974     val.try_into().expect("`usize` doesn't fit inside a `u64`")
975 }
976 
977 pub struct PendingOperation {
978     waker_token: Option<WakerToken>,
979     ex: Weak<RawExecutor>,
980     submitted: bool,
981 }
982 
983 impl Future for PendingOperation {
984     type Output = Result<u32>;
985 
poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>986     fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
987         let token = self
988             .waker_token
989             .as_ref()
990             .expect("PendingOperation polled after returning Poll::Ready");
991         if let Some(ex) = self.ex.upgrade() {
992             if let Some(result) = ex.get_result(token, cx) {
993                 self.waker_token = None;
994                 Poll::Ready(result.map_err(Error::Io))
995             } else {
996                 // If we haven't submitted the operation yet, and the executor runs on a different
997                 // thread then submit it now. Otherwise the executor will submit it automatically
998                 // the next time it calls UringContext::wait.
999                 if !self.submitted && !ex.runs_tasks_on_current_thread() {
1000                     match ex.ctx.submit() {
1001                         Ok(()) => self.submitted = true,
1002                         // If the kernel ring is full then wait until some ops are removed from the
1003                         // completion queue. This op should get submitted the next time the executor
1004                         // calls UringContext::wait.
1005                         Err(io_uring::Error::RingEnter(libc::EBUSY)) => {}
1006                         Err(e) => return Poll::Ready(Err(Error::URingEnter(e))),
1007                     }
1008                 }
1009                 Poll::Pending
1010             }
1011         } else {
1012             Poll::Ready(Err(Error::ExecutorGone))
1013         }
1014     }
1015 }
1016 
1017 impl Drop for PendingOperation {
drop(&mut self)1018     fn drop(&mut self) {
1019         if let Some(waker_token) = self.waker_token.take() {
1020             if let Some(ex) = self.ex.upgrade() {
1021                 ex.cancel_operation(waker_token);
1022             }
1023         }
1024     }
1025 }
1026 
1027 #[cfg(test)]
1028 mod tests {
1029     use std::future::Future;
1030     use std::io::Read;
1031     use std::io::Write;
1032     use std::mem;
1033     use std::pin::Pin;
1034     use std::rc::Rc;
1035     use std::task::Context;
1036     use std::task::Poll;
1037 
1038     use futures::executor::block_on;
1039 
1040     use super::*;
1041     use crate::mem::BackingMemory;
1042     use crate::mem::MemRegion;
1043     use crate::mem::VecIoWrapper;
1044 
1045     // A future that returns ready when the uring queue is empty.
1046     struct UringQueueEmpty<'a> {
1047         ex: &'a URingExecutor,
1048     }
1049 
1050     impl<'a> Future for UringQueueEmpty<'a> {
1051         type Output = ();
1052 
poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output>1053         fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Self::Output> {
1054             if self.ex.raw.ring.lock().ops.is_empty() {
1055                 Poll::Ready(())
1056             } else {
1057                 Poll::Pending
1058             }
1059         }
1060     }
1061 
1062     #[test]
dont_drop_backing_mem_read()1063     fn dont_drop_backing_mem_read() {
1064         if !is_uring_stable() {
1065             return;
1066         }
1067 
1068         // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
1069         // op is pending.
1070         let bm =
1071             Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
1072 
1073         // Use pipes to create a future that will block forever.
1074         let (rx, mut tx) = base::pipe(true).unwrap();
1075 
1076         // Set up the TLS for the uring_executor by creating one.
1077         let ex = URingExecutor::new().unwrap();
1078 
1079         // Register the receive side of the pipe with the executor.
1080         let registered_source = ex.register_source(&rx).expect("register source failed");
1081 
1082         // Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
1083         // of the op.
1084         let pending_op = registered_source
1085             .start_read_to_mem(None, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
1086             .expect("failed to start read to mem");
1087 
1088         // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
1089         // reference while the op is active.
1090         assert_eq!(Arc::strong_count(&bm), 2);
1091 
1092         // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
1093         // it.
1094         drop(pending_op);
1095         assert_eq!(Arc::strong_count(&bm), 2);
1096 
1097         // Finishing the operation should put the Arc count back to 1.
1098         // write to the pipe to wake the read pipe and then wait for the uring result in the
1099         // executor.
1100         tx.write_all(&[0u8; 8]).expect("write failed");
1101         ex.run_until(UringQueueEmpty { ex: &ex })
1102             .expect("Failed to wait for read pipe ready");
1103         assert_eq!(Arc::strong_count(&bm), 1);
1104     }
1105 
1106     #[test]
dont_drop_backing_mem_write()1107     fn dont_drop_backing_mem_write() {
1108         if !is_uring_stable() {
1109             return;
1110         }
1111 
1112         // Create a backing memory wrapped in an Arc and check that the drop isn't called while the
1113         // op is pending.
1114         let bm =
1115             Arc::new(VecIoWrapper::from(vec![0u8; 4096])) as Arc<dyn BackingMemory + Send + Sync>;
1116 
1117         // Use pipes to create a future that will block forever.
1118         let (mut rx, tx) = base::new_pipe_full().expect("Pipe failed");
1119 
1120         // Set up the TLS for the uring_executor by creating one.
1121         let ex = URingExecutor::new().unwrap();
1122 
1123         // Register the receive side of the pipe with the executor.
1124         let registered_source = ex.register_source(&tx).expect("register source failed");
1125 
1126         // Submit the op to the kernel. Next, test that the source keeps its Arc open for the duration
1127         // of the op.
1128         let pending_op = registered_source
1129             .start_write_from_mem(None, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
1130             .expect("failed to start write to mem");
1131 
1132         // Here the Arc count must be two, one for `bm` and one to signify that the kernel has a
1133         // reference while the op is active.
1134         assert_eq!(Arc::strong_count(&bm), 2);
1135 
1136         // Dropping the operation shouldn't reduce the Arc count, as the kernel could still be using
1137         // it.
1138         drop(pending_op);
1139         assert_eq!(Arc::strong_count(&bm), 2);
1140 
1141         // Finishing the operation should put the Arc count back to 1.
1142         // write to the pipe to wake the read pipe and then wait for the uring result in the
1143         // executor.
1144         let mut buf = vec![0u8; base::round_up_to_page_size(1)];
1145         rx.read_exact(&mut buf).expect("read to empty failed");
1146         ex.run_until(UringQueueEmpty { ex: &ex })
1147             .expect("Failed to wait for write pipe ready");
1148         assert_eq!(Arc::strong_count(&bm), 1);
1149     }
1150 
1151     #[test]
canceled_before_completion()1152     fn canceled_before_completion() {
1153         if !is_uring_stable() {
1154             return;
1155         }
1156 
1157         async fn cancel_io(op: PendingOperation) {
1158             mem::drop(op);
1159         }
1160 
1161         async fn check_result(op: PendingOperation, expected: u32) {
1162             let actual = op.await.expect("operation failed to complete");
1163             assert_eq!(expected, actual);
1164         }
1165 
1166         let bm =
1167             Arc::new(VecIoWrapper::from(vec![0u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1168 
1169         let (rx, tx) = base::pipe(true).expect("Pipe failed");
1170 
1171         let ex = URingExecutor::new().unwrap();
1172 
1173         let rx_source = ex.register_source(&rx).expect("register source failed");
1174         let tx_source = ex.register_source(&tx).expect("register source failed");
1175 
1176         let read_task = rx_source
1177             .start_read_to_mem(None, Arc::clone(&bm), &[MemRegion { offset: 0, len: 8 }])
1178             .expect("failed to start read to mem");
1179 
1180         ex.spawn_local(cancel_io(read_task)).detach();
1181 
1182         // Write to the pipe so that the kernel operation will complete.
1183         let buf =
1184             Arc::new(VecIoWrapper::from(vec![0xc2u8; 16])) as Arc<dyn BackingMemory + Send + Sync>;
1185         let write_task = tx_source
1186             .start_write_from_mem(None, Arc::clone(&buf), &[MemRegion { offset: 0, len: 8 }])
1187             .expect("failed to start write from mem");
1188 
1189         ex.run_until(check_result(write_task, 8))
1190             .expect("Failed to run executor");
1191     }
1192 
1193     // We will drain all ops on drop and its not guaranteed that operation won't finish
1194     #[ignore]
1195     #[test]
drop_before_completion()1196     fn drop_before_completion() {
1197         if !is_uring_stable() {
1198             return;
1199         }
1200 
1201         const VALUE: u64 = 0xef6c_a8df_b842_eb9c;
1202 
1203         async fn check_op(op: PendingOperation) {
1204             let err = op.await.expect_err("Op completed successfully");
1205             match err {
1206                 Error::ExecutorGone => {}
1207                 e => panic!("Unexpected error from op: {}", e),
1208             }
1209         }
1210 
1211         let (mut rx, mut tx) = base::pipe(true).expect("Pipe failed");
1212 
1213         let ex = URingExecutor::new().unwrap();
1214 
1215         let tx_source = ex.register_source(&tx).expect("Failed to register source");
1216         let bm = Arc::new(VecIoWrapper::from(VALUE.to_ne_bytes().to_vec()));
1217         let op = tx_source
1218             .start_write_from_mem(
1219                 None,
1220                 bm,
1221                 &[MemRegion {
1222                     offset: 0,
1223                     len: mem::size_of::<u64>(),
1224                 }],
1225             )
1226             .expect("Failed to start write from mem");
1227 
1228         ex.spawn_local(check_op(op)).detach();
1229 
1230         // Now drop the executor. It shouldn't run the write operation.
1231         mem::drop(ex);
1232 
1233         // Make sure the executor did not complete the uring operation.
1234         let new_val = [0x2e; 8];
1235         tx.write_all(&new_val).unwrap();
1236 
1237         let mut buf = 0u64.to_ne_bytes();
1238         rx.read_exact(&mut buf[..])
1239             .expect("Failed to read from pipe");
1240 
1241         assert_eq!(buf, new_val);
1242     }
1243 
1244     // Dropping a task that owns a BlockingPool shouldn't leak the pool.
1245     #[test]
drop_detached_blocking_pool()1246     fn drop_detached_blocking_pool() {
1247         if !is_uring_stable() {
1248             return;
1249         }
1250 
1251         struct Cleanup(BlockingPool);
1252 
1253         impl Drop for Cleanup {
1254             fn drop(&mut self) {
1255                 // Make sure we shutdown cleanly (BlockingPool::drop just prints a warning).
1256                 self.0
1257                     .shutdown(Some(
1258                         std::time::Instant::now() + std::time::Duration::from_secs(1),
1259                     ))
1260                     .unwrap();
1261             }
1262         }
1263 
1264         let rc = Rc::new(std::cell::Cell::new(0));
1265         {
1266             let ex = URingExecutor::new().unwrap();
1267             let rc_clone = rc.clone();
1268             ex.spawn_local(async move {
1269                 rc_clone.set(1);
1270                 let pool = Cleanup(BlockingPool::new(1, std::time::Duration::new(60, 0)));
1271                 let (send, recv) = std::sync::mpsc::sync_channel::<()>(0);
1272                 // Spawn a blocking task.
1273                 let blocking_task = pool.0.spawn(move || {
1274                     // Rendezvous.
1275                     assert_eq!(recv.recv(), Ok(()));
1276                     // Wait for drop.
1277                     assert_eq!(recv.recv(), Err(std::sync::mpsc::RecvError));
1278                 });
1279                 // Make sure it has actually started (using a "rendezvous channel" send).
1280                 //
1281                 // Without this step, we'll have a race where we can shutdown the blocking pool
1282                 // before the worker thread pops off the task.
1283                 send.send(()).unwrap();
1284                 // Wait for it to finish
1285                 blocking_task.await;
1286                 rc_clone.set(2);
1287             })
1288             .detach();
1289             ex.run_until(async {}).unwrap();
1290             // `ex` is dropped here. If everything is working as expected, it should drop all of
1291             // its tasks, including `send` and `pool` (in that order, which is important). `pool`'s
1292             // `Drop` impl will try to join all the worker threads, which should work because send
1293             // half of the channel closed.
1294         }
1295         assert_eq!(rc.get(), 1);
1296         Rc::try_unwrap(rc).expect("Rc had too many refs");
1297     }
1298 
1299     #[test]
drop_on_different_thread()1300     fn drop_on_different_thread() {
1301         if !is_uring_stable() {
1302             return;
1303         }
1304 
1305         let ex = URingExecutor::new().unwrap();
1306 
1307         let ex2 = ex.clone();
1308         let t = thread::spawn(move || ex2.run_until(async {}));
1309 
1310         t.join().unwrap().unwrap();
1311 
1312         // Leave an uncompleted operation in the queue so that the drop impl will try to drive it to
1313         // completion.
1314         let (_rx, tx) = base::pipe(true).expect("Pipe failed");
1315         let tx = ex.register_source(&tx).expect("Failed to register source");
1316         let bm = Arc::new(VecIoWrapper::from(0xf2e96u64.to_ne_bytes().to_vec()));
1317         let op = tx
1318             .start_write_from_mem(
1319                 None,
1320                 bm,
1321                 &[MemRegion {
1322                     offset: 0,
1323                     len: mem::size_of::<u64>(),
1324                 }],
1325             )
1326             .expect("Failed to start write from mem");
1327 
1328         mem::drop(ex);
1329 
1330         match block_on(op).expect_err("Pending operation completed after executor was dropped") {
1331             Error::ExecutorGone => {}
1332             e => panic!("Unexpected error after dropping executor: {}", e),
1333         }
1334     }
1335 }
1336