1 //! A linked list of debt nodes. 2 //! 3 //! A node may or may not be owned by a thread. Reader debts are allocated in its owned node, 4 //! writer walks everything (but may also use some owned values). 5 //! 6 //! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another 7 //! thread later on). This makes the implementation much simpler, since everything here is 8 //! `'static` and we don't have to care about knowing when to free stuff. 9 //! 10 //! The nodes contain both the fast primary slots and a secondary fallback ones. 11 //! 12 //! # Synchronization 13 //! 14 //! We synchronize several things here. 15 //! 16 //! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each 17 //! attempt to add another node). Note that certain parts never change after that (they aren't even 18 //! atomic) and other things that do change take care of themselves (the debt slots have their own 19 //! synchronization, etc). 20 //! 21 //! The ownership is acquire-release lock pattern. 22 //! 23 //! Similar, the counting of active writers is an acquire-release lock pattern. 24 //! 25 //! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see 26 //! at least as up to date value of the writers as when the cooldown started. That we if we see 0, 27 //! we know it must have happened since then. 28 29 use core::cell::Cell; 30 use core::ptr; 31 use core::slice::Iter; 32 use core::sync::atomic::Ordering::*; 33 use core::sync::atomic::{AtomicPtr, AtomicUsize}; 34 35 #[cfg(feature = "experimental-thread-local")] 36 use core::cell::OnceCell; 37 38 use alloc::boxed::Box; 39 40 use super::fast::{Local as FastLocal, Slots as FastSlots}; 41 use super::helping::{Local as HelpingLocal, Slots as HelpingSlots}; 42 use super::Debt; 43 use crate::RefCnt; 44 45 const NODE_UNUSED: usize = 0; 46 const NODE_USED: usize = 1; 47 const NODE_COOLDOWN: usize = 2; 48 49 /// The head of the debt linked list. 50 static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut()); 51 52 pub struct NodeReservation<'a>(&'a Node); 53 54 impl Drop for NodeReservation<'_> { drop(&mut self)55 fn drop(&mut self) { 56 self.0.active_writers.fetch_sub(1, Release); 57 } 58 } 59 60 /// One thread-local node for debts. 61 #[repr(C, align(64))] 62 pub(crate) struct Node { 63 fast: FastSlots, 64 helping: HelpingSlots, 65 in_use: AtomicUsize, 66 // Next node in the list. 67 // 68 // It is a pointer because we touch it before synchronization (we don't _dereference_ it before 69 // synchronization, only manipulate the pointer itself). That is illegal according to strict 70 // interpretation of the rules by MIRI on references. 71 next: *const Node, 72 active_writers: AtomicUsize, 73 } 74 75 impl Default for Node { default() -> Self76 fn default() -> Self { 77 Node { 78 fast: FastSlots::default(), 79 helping: HelpingSlots::default(), 80 in_use: AtomicUsize::new(NODE_USED), 81 next: ptr::null(), 82 active_writers: AtomicUsize::new(0), 83 } 84 } 85 } 86 87 impl Node { 88 /// Goes through the debt linked list. 89 /// 90 /// This traverses the linked list, calling the closure on each node. If the closure returns 91 /// `Some`, it terminates with that value early, otherwise it runs to the end. traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R>92 pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> { 93 // Acquire ‒ we want to make sure we read the correct version of data at the end of the 94 // pointer. Any write to the DEBT_HEAD is with Release. 95 // 96 // Furthermore, we need to see the newest version of the list in case we examine the debts 97 // - if a new one is added recently, we don't want a stale read -> SeqCst. 98 // 99 // Note that the other pointers in the chain never change and are *ordinary* pointers. The 100 // whole linked list is synchronized through the head. 101 let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() }; 102 while let Some(node) = current { 103 let result = f(node); 104 if result.is_some() { 105 return result; 106 } 107 current = unsafe { node.next.as_ref() }; 108 } 109 None 110 } 111 112 /// Put the current thread node into cooldown start_cooldown(&self)113 fn start_cooldown(&self) { 114 // Trick: Make sure we have an up to date value of the active_writers in this thread, so we 115 // can properly release it below. 116 let _reservation = self.reserve_writer(); 117 assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release)); 118 } 119 120 /// Perform a cooldown if the node is ready. 121 /// 122 /// See the ABA protection at the [helping]. check_cooldown(&self)123 fn check_cooldown(&self) { 124 // Check if the node is in cooldown, for two reasons: 125 // * Skip most of nodes fast, without dealing with them. 126 // * More importantly, sync the value of active_writers to be at least the value when the 127 // cooldown started. That way we know the 0 we observe happened some time after 128 // start_cooldown. 129 if self.in_use.load(Acquire) == NODE_COOLDOWN { 130 // The rest can be nicely relaxed ‒ no memory is being synchronized by these 131 // operations. We just see an up to date 0 and allow someone (possibly us) to claim the 132 // node later on. 133 if self.active_writers.load(Relaxed) == 0 { 134 let _ = self 135 .in_use 136 .compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed); 137 } 138 } 139 } 140 141 /// Mark this node that a writer is currently playing with it. reserve_writer(&self) -> NodeReservation142 pub fn reserve_writer(&self) -> NodeReservation { 143 self.active_writers.fetch_add(1, Acquire); 144 NodeReservation(self) 145 } 146 147 /// "Allocate" a node. 148 /// 149 /// Either a new one is created, or previous one is reused. The node is claimed to become 150 /// in_use. get() -> &'static Self151 fn get() -> &'static Self { 152 // Try to find an unused one in the chain and reuse it. 153 Self::traverse(|node| { 154 node.check_cooldown(); 155 if node 156 .in_use 157 // We claim a unique control over the generation and the right to write to slots if 158 // they are NO_DEPT 159 .compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed) 160 .is_ok() 161 { 162 Some(node) 163 } else { 164 None 165 } 166 }) 167 // If that didn't work, create a new one and prepend to the list. 168 .unwrap_or_else(|| { 169 let node = Box::leak(Box::<Node>::default()); 170 node.helping.init(); 171 // We don't want to read any data in addition to the head, Relaxed is fine 172 // here. 173 // 174 // We do need to release the data to others, but for that, we acquire in the 175 // compare_exchange below. 176 let mut head = LIST_HEAD.load(Relaxed); 177 loop { 178 node.next = head; 179 if let Err(old) = LIST_HEAD.compare_exchange_weak( 180 head, node, 181 // We need to release *the whole chain* here. For that, we need to 182 // acquire it first. 183 // 184 // SeqCst because we need to make sure it is properly set "before" we do 185 // anything to the debts. 186 SeqCst, Relaxed, // Nothing changed, go next round of the loop. 187 ) { 188 head = old; 189 } else { 190 return node; 191 } 192 } 193 }) 194 } 195 196 /// Iterate over the fast slots. fast_slots(&self) -> Iter<Debt>197 pub(crate) fn fast_slots(&self) -> Iter<Debt> { 198 self.fast.into_iter() 199 } 200 201 /// Access the helping slot. helping_slot(&self) -> &Debt202 pub(crate) fn helping_slot(&self) -> &Debt { 203 self.helping.slot() 204 } 205 } 206 207 /// A wrapper around a node pointer, to un-claim the node on thread shutdown. 208 pub(crate) struct LocalNode { 209 /// Node for this thread, if any. 210 /// 211 /// We don't necessarily have to own one, but if we don't, we'll get one before the first use. 212 node: Cell<Option<&'static Node>>, 213 214 /// Thread-local data for the fast slots. 215 fast: FastLocal, 216 217 /// Thread local data for the helping strategy. 218 helping: HelpingLocal, 219 } 220 221 impl LocalNode { 222 #[cfg(not(feature = "experimental-thread-local"))] with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R223 pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R { 224 let f = Cell::new(Some(f)); 225 THREAD_HEAD 226 .try_with(|head| { 227 if head.node.get().is_none() { 228 head.node.set(Some(Node::get())); 229 } 230 let f = f.take().unwrap(); 231 f(head) 232 }) 233 // During the application shutdown, the thread local storage may be already 234 // deallocated. In that case, the above fails but we still need something. So we just 235 // find or allocate a node and use it just once. 236 // 237 // Note that the situation should be very very rare and not happen often, so the slower 238 // performance doesn't matter that much. 239 .unwrap_or_else(|_| { 240 let tmp_node = LocalNode { 241 node: Cell::new(Some(Node::get())), 242 fast: FastLocal::default(), 243 helping: HelpingLocal::default(), 244 }; 245 let f = f.take().unwrap(); 246 f(&tmp_node) 247 // Drop of tmp_node -> sends the node we just used into cooldown. 248 }) 249 } 250 251 #[cfg(feature = "experimental-thread-local")] with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R252 pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R { 253 let thread_head = THREAD_HEAD.get_or_init(|| LocalNode { 254 node: Cell::new(None), 255 fast: FastLocal::default(), 256 helping: HelpingLocal::default(), 257 }); 258 if thread_head.node.get().is_none() { 259 thread_head.node.set(Some(Node::get())); 260 } 261 f(&thread_head) 262 } 263 264 /// Creates a new debt. 265 /// 266 /// This stores the debt of the given pointer (untyped, casted into an usize) and returns a 267 /// reference to that slot, or gives up with `None` if all the slots are currently full. 268 #[inline] new_fast(&self, ptr: usize) -> Option<&'static Debt>269 pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> { 270 let node = &self.node.get().expect("LocalNode::with ensures it is set"); 271 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); 272 node.fast.get_debt(ptr, &self.fast) 273 } 274 275 /// Initializes a helping slot transaction. 276 /// 277 /// Returns the generation (with tag). new_helping(&self, ptr: usize) -> usize278 pub(crate) fn new_helping(&self, ptr: usize) -> usize { 279 let node = &self.node.get().expect("LocalNode::with ensures it is set"); 280 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); 281 let (gen, discard) = node.helping.get_debt(ptr, &self.helping); 282 if discard { 283 // Too many generations happened, make sure the writers give the poor node a break for 284 // a while so they don't observe the generation wrapping around. 285 node.start_cooldown(); 286 self.node.take(); 287 } 288 gen 289 } 290 291 /// Confirm the helping transaction. 292 /// 293 /// The generation comes from previous new_helping. 294 /// 295 /// Will either return a debt with the pointer, or a debt to pay and a replacement (already 296 /// protected) address. confirm_helping( &self, gen: usize, ptr: usize, ) -> Result<&'static Debt, (&'static Debt, usize)>297 pub(crate) fn confirm_helping( 298 &self, 299 gen: usize, 300 ptr: usize, 301 ) -> Result<&'static Debt, (&'static Debt, usize)> { 302 let node = &self.node.get().expect("LocalNode::with ensures it is set"); 303 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); 304 let slot = node.helping_slot(); 305 node.helping 306 .confirm(gen, ptr) 307 .map(|()| slot) 308 .map_err(|repl| (slot, repl)) 309 } 310 311 /// The writer side of a helping slot. 312 /// 313 /// This potentially helps the `who` node (uses self as the local node, which must be 314 /// different) by loading the address that one is trying to load. help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R) where T: RefCnt, R: Fn() -> T,315 pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R) 316 where 317 T: RefCnt, 318 R: Fn() -> T, 319 { 320 let node = &self.node.get().expect("LocalNode::with ensures it is set"); 321 debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED); 322 node.helping.help(&who.helping, storage_addr, replacement) 323 } 324 } 325 326 impl Drop for LocalNode { drop(&mut self)327 fn drop(&mut self) { 328 if let Some(node) = self.node.get() { 329 // Release - syncing writes/ownership of this Node 330 node.start_cooldown(); 331 } 332 } 333 } 334 335 #[cfg(not(feature = "experimental-thread-local"))] 336 thread_local! { 337 /// A debt node assigned to this thread. 338 static THREAD_HEAD: LocalNode = LocalNode { 339 node: Cell::new(None), 340 fast: FastLocal::default(), 341 helping: HelpingLocal::default(), 342 }; 343 } 344 345 #[cfg(feature = "experimental-thread-local")] 346 #[thread_local] 347 /// A debt node assigned to this thread. 348 static THREAD_HEAD: OnceCell<LocalNode> = OnceCell::new(); 349 350 #[cfg(test)] 351 mod tests { 352 use super::*; 353 354 impl Node { is_empty(&self) -> bool355 fn is_empty(&self) -> bool { 356 self.fast_slots() 357 .chain(core::iter::once(self.helping_slot())) 358 .all(|d| d.0.load(Relaxed) == Debt::NONE) 359 } 360 get_thread() -> &'static Self361 fn get_thread() -> &'static Self { 362 LocalNode::with(|h| h.node.get().unwrap()) 363 } 364 } 365 366 /// A freshly acquired thread local node is empty. 367 #[test] new_empty()368 fn new_empty() { 369 assert!(Node::get_thread().is_empty()); 370 } 371 } 372