1 use crate::loom::sync::atomic::AtomicUsize; 2 use crate::runtime::io::ScheduledIo; 3 use crate::util::linked_list::{self, LinkedList}; 4 5 use std::io; 6 use std::ptr::NonNull; 7 use std::sync::atomic::Ordering::{Acquire, Release}; 8 use std::sync::Arc; 9 10 pub(super) struct RegistrationSet { 11 num_pending_release: AtomicUsize, 12 } 13 14 pub(super) struct Synced { 15 // True when the I/O driver shutdown. At this point, no more registrations 16 // should be added to the set. 17 is_shutdown: bool, 18 19 // List of all registrations tracked by the set 20 registrations: LinkedList<Arc<ScheduledIo>, ScheduledIo>, 21 22 // Registrations that are pending drop. When a `Registration` is dropped, it 23 // stores its `ScheduledIo` in this list. The I/O driver is responsible for 24 // dropping it. This ensures the `ScheduledIo` is not freed while it can 25 // still be included in an I/O event. 26 pending_release: Vec<Arc<ScheduledIo>>, 27 } 28 29 impl RegistrationSet { new() -> (RegistrationSet, Synced)30 pub(super) fn new() -> (RegistrationSet, Synced) { 31 let set = RegistrationSet { 32 num_pending_release: AtomicUsize::new(0), 33 }; 34 35 let synced = Synced { 36 is_shutdown: false, 37 registrations: LinkedList::new(), 38 pending_release: Vec::with_capacity(16), 39 }; 40 41 (set, synced) 42 } 43 is_shutdown(&self, synced: &Synced) -> bool44 pub(super) fn is_shutdown(&self, synced: &Synced) -> bool { 45 synced.is_shutdown 46 } 47 48 /// Returns `true` if there are registrations that need to be released needs_release(&self) -> bool49 pub(super) fn needs_release(&self) -> bool { 50 self.num_pending_release.load(Acquire) != 0 51 } 52 allocate(&self, synced: &mut Synced) -> io::Result<Arc<ScheduledIo>>53 pub(super) fn allocate(&self, synced: &mut Synced) -> io::Result<Arc<ScheduledIo>> { 54 if synced.is_shutdown { 55 return Err(io::Error::new( 56 io::ErrorKind::Other, 57 crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, 58 )); 59 } 60 61 let ret = Arc::new(ScheduledIo::default()); 62 63 // Push a ref into the list of all resources. 64 synced.registrations.push_front(ret.clone()); 65 66 Ok(ret) 67 } 68 69 // Returns `true` if the caller should unblock the I/O driver to purge 70 // registrations pending release. deregister(&self, synced: &mut Synced, registration: &Arc<ScheduledIo>) -> bool71 pub(super) fn deregister(&self, synced: &mut Synced, registration: &Arc<ScheduledIo>) -> bool { 72 // Kind of arbitrary, but buffering 16 `ScheduledIo`s doesn't seem like much 73 const NOTIFY_AFTER: usize = 16; 74 75 synced.pending_release.push(registration.clone()); 76 77 let len = synced.pending_release.len(); 78 self.num_pending_release.store(len, Release); 79 80 len == NOTIFY_AFTER 81 } 82 shutdown(&self, synced: &mut Synced) -> Vec<Arc<ScheduledIo>>83 pub(super) fn shutdown(&self, synced: &mut Synced) -> Vec<Arc<ScheduledIo>> { 84 if synced.is_shutdown { 85 return vec![]; 86 } 87 88 synced.is_shutdown = true; 89 synced.pending_release.clear(); 90 91 // Building a vec of all outstanding I/O handles could be expensive, but 92 // this is the shutdown operation. In theory, shutdowns should be 93 // "clean" with no outstanding I/O resources. Even if it is slow, we 94 // aren't optimizing for shutdown. 95 let mut ret = vec![]; 96 97 while let Some(io) = synced.registrations.pop_back() { 98 ret.push(io); 99 } 100 101 ret 102 } 103 release(&self, synced: &mut Synced)104 pub(super) fn release(&self, synced: &mut Synced) { 105 for io in synced.pending_release.drain(..) { 106 // safety: the registration is part of our list 107 let _ = unsafe { synced.registrations.remove(io.as_ref().into()) }; 108 } 109 110 self.num_pending_release.store(0, Release); 111 } 112 } 113 114 // Safety: `Arc` pins the inner data 115 unsafe impl linked_list::Link for Arc<ScheduledIo> { 116 type Handle = Arc<ScheduledIo>; 117 type Target = ScheduledIo; 118 as_raw(handle: &Self::Handle) -> NonNull<ScheduledIo>119 fn as_raw(handle: &Self::Handle) -> NonNull<ScheduledIo> { 120 // safety: Arc::as_ptr never returns null 121 unsafe { NonNull::new_unchecked(Arc::as_ptr(handle) as *mut _) } 122 } 123 from_raw(ptr: NonNull<Self::Target>) -> Arc<ScheduledIo>124 unsafe fn from_raw(ptr: NonNull<Self::Target>) -> Arc<ScheduledIo> { 125 // safety: the linked list currently owns a ref count 126 unsafe { Arc::from_raw(ptr.as_ptr() as *const _) } 127 } 128 pointers( target: NonNull<Self::Target>, ) -> NonNull<linked_list::Pointers<ScheduledIo>>129 unsafe fn pointers( 130 target: NonNull<Self::Target>, 131 ) -> NonNull<linked_list::Pointers<ScheduledIo>> { 132 NonNull::new_unchecked(target.as_ref().linked_list_pointers.get()) 133 } 134 } 135