• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::loom::sync::atomic::AtomicUsize;
2 use crate::runtime::io::ScheduledIo;
3 use crate::util::linked_list::{self, LinkedList};
4 
5 use std::io;
6 use std::ptr::NonNull;
7 use std::sync::atomic::Ordering::{Acquire, Release};
8 use std::sync::Arc;
9 
10 pub(super) struct RegistrationSet {
11     num_pending_release: AtomicUsize,
12 }
13 
14 pub(super) struct Synced {
15     // True when the I/O driver shutdown. At this point, no more registrations
16     // should be added to the set.
17     is_shutdown: bool,
18 
19     // List of all registrations tracked by the set
20     registrations: LinkedList<Arc<ScheduledIo>, ScheduledIo>,
21 
22     // Registrations that are pending drop. When a `Registration` is dropped, it
23     // stores its `ScheduledIo` in this list. The I/O driver is responsible for
24     // dropping it. This ensures the `ScheduledIo` is not freed while it can
25     // still be included in an I/O event.
26     pending_release: Vec<Arc<ScheduledIo>>,
27 }
28 
29 impl RegistrationSet {
new() -> (RegistrationSet, Synced)30     pub(super) fn new() -> (RegistrationSet, Synced) {
31         let set = RegistrationSet {
32             num_pending_release: AtomicUsize::new(0),
33         };
34 
35         let synced = Synced {
36             is_shutdown: false,
37             registrations: LinkedList::new(),
38             pending_release: Vec::with_capacity(16),
39         };
40 
41         (set, synced)
42     }
43 
is_shutdown(&self, synced: &Synced) -> bool44     pub(super) fn is_shutdown(&self, synced: &Synced) -> bool {
45         synced.is_shutdown
46     }
47 
48     /// Returns `true` if there are registrations that need to be released
needs_release(&self) -> bool49     pub(super) fn needs_release(&self) -> bool {
50         self.num_pending_release.load(Acquire) != 0
51     }
52 
allocate(&self, synced: &mut Synced) -> io::Result<Arc<ScheduledIo>>53     pub(super) fn allocate(&self, synced: &mut Synced) -> io::Result<Arc<ScheduledIo>> {
54         if synced.is_shutdown {
55             return Err(io::Error::new(
56                 io::ErrorKind::Other,
57                 crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
58             ));
59         }
60 
61         let ret = Arc::new(ScheduledIo::default());
62 
63         // Push a ref into the list of all resources.
64         synced.registrations.push_front(ret.clone());
65 
66         Ok(ret)
67     }
68 
69     // Returns `true` if the caller should unblock the I/O driver to purge
70     // registrations pending release.
deregister(&self, synced: &mut Synced, registration: &Arc<ScheduledIo>) -> bool71     pub(super) fn deregister(&self, synced: &mut Synced, registration: &Arc<ScheduledIo>) -> bool {
72         // Kind of arbitrary, but buffering 16 `ScheduledIo`s doesn't seem like much
73         const NOTIFY_AFTER: usize = 16;
74 
75         synced.pending_release.push(registration.clone());
76 
77         let len = synced.pending_release.len();
78         self.num_pending_release.store(len, Release);
79 
80         len == NOTIFY_AFTER
81     }
82 
shutdown(&self, synced: &mut Synced) -> Vec<Arc<ScheduledIo>>83     pub(super) fn shutdown(&self, synced: &mut Synced) -> Vec<Arc<ScheduledIo>> {
84         if synced.is_shutdown {
85             return vec![];
86         }
87 
88         synced.is_shutdown = true;
89         synced.pending_release.clear();
90 
91         // Building a vec of all outstanding I/O handles could be expensive, but
92         // this is the shutdown operation. In theory, shutdowns should be
93         // "clean" with no outstanding I/O resources. Even if it is slow, we
94         // aren't optimizing for shutdown.
95         let mut ret = vec![];
96 
97         while let Some(io) = synced.registrations.pop_back() {
98             ret.push(io);
99         }
100 
101         ret
102     }
103 
release(&self, synced: &mut Synced)104     pub(super) fn release(&self, synced: &mut Synced) {
105         for io in synced.pending_release.drain(..) {
106             // safety: the registration is part of our list
107             let _ = unsafe { synced.registrations.remove(io.as_ref().into()) };
108         }
109 
110         self.num_pending_release.store(0, Release);
111     }
112 }
113 
114 // Safety: `Arc` pins the inner data
115 unsafe impl linked_list::Link for Arc<ScheduledIo> {
116     type Handle = Arc<ScheduledIo>;
117     type Target = ScheduledIo;
118 
as_raw(handle: &Self::Handle) -> NonNull<ScheduledIo>119     fn as_raw(handle: &Self::Handle) -> NonNull<ScheduledIo> {
120         // safety: Arc::as_ptr never returns null
121         unsafe { NonNull::new_unchecked(Arc::as_ptr(handle) as *mut _) }
122     }
123 
from_raw(ptr: NonNull<Self::Target>) -> Arc<ScheduledIo>124     unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Arc<ScheduledIo> {
125         // safety: the linked list currently owns a ref count
126         unsafe { Arc::from_raw(ptr.as_ptr() as *const _) }
127     }
128 
pointers( target: NonNull<Self::Target>, ) -> NonNull<linked_list::Pointers<ScheduledIo>>129     unsafe fn pointers(
130         target: NonNull<Self::Target>,
131     ) -> NonNull<linked_list::Pointers<ScheduledIo>> {
132         NonNull::new_unchecked(target.as_ref().linked_list_pointers.get())
133     }
134 }
135