• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! A linked list of debt nodes.
2 //!
3 //! A node may or may not be owned by a thread. Reader debts are allocated in its owned node,
4 //! writer walks everything (but may also use some owned values).
5 //!
6 //! The list is prepend-only ‒ if thread dies, the node lives on (and can be claimed by another
7 //! thread later on). This makes the implementation much simpler, since everything here is
8 //! `'static` and we don't have to care about knowing when to free stuff.
9 //!
10 //! The nodes contain both the fast primary slots and a secondary fallback ones.
11 //!
12 //! # Synchronization
13 //!
14 //! We synchronize several things here.
15 //!
16 //! The addition of nodes is synchronized through the head (Load on each read, AcqReal on each
17 //! attempt to add another node). Note that certain parts never change after that (they aren't even
18 //! atomic) and other things that do change take care of themselves (the debt slots have their own
19 //! synchronization, etc).
20 //!
21 //! The ownership is acquire-release lock pattern.
22 //!
23 //! Similar, the counting of active writers is an acquire-release lock pattern.
24 //!
25 //! We also do release-acquire "send" from the start-cooldown to check-cooldown to make sure we see
26 //! at least as up to date value of the writers as when the cooldown started. That we if we see 0,
27 //! we know it must have happened since then.
28 
29 use core::cell::Cell;
30 use core::ptr;
31 use core::slice::Iter;
32 use core::sync::atomic::Ordering::*;
33 use core::sync::atomic::{AtomicPtr, AtomicUsize};
34 
35 #[cfg(feature = "experimental-thread-local")]
36 use core::cell::OnceCell;
37 
38 use alloc::boxed::Box;
39 
40 use super::fast::{Local as FastLocal, Slots as FastSlots};
41 use super::helping::{Local as HelpingLocal, Slots as HelpingSlots};
42 use super::Debt;
43 use crate::RefCnt;
44 
45 const NODE_UNUSED: usize = 0;
46 const NODE_USED: usize = 1;
47 const NODE_COOLDOWN: usize = 2;
48 
49 /// The head of the debt linked list.
50 static LIST_HEAD: AtomicPtr<Node> = AtomicPtr::new(ptr::null_mut());
51 
52 pub struct NodeReservation<'a>(&'a Node);
53 
54 impl Drop for NodeReservation<'_> {
drop(&mut self)55     fn drop(&mut self) {
56         self.0.active_writers.fetch_sub(1, Release);
57     }
58 }
59 
60 /// One thread-local node for debts.
61 #[repr(C, align(64))]
62 pub(crate) struct Node {
63     fast: FastSlots,
64     helping: HelpingSlots,
65     in_use: AtomicUsize,
66     // Next node in the list.
67     //
68     // It is a pointer because we touch it before synchronization (we don't _dereference_ it before
69     // synchronization, only manipulate the pointer itself). That is illegal according to strict
70     // interpretation of the rules by MIRI on references.
71     next: *const Node,
72     active_writers: AtomicUsize,
73 }
74 
75 impl Default for Node {
default() -> Self76     fn default() -> Self {
77         Node {
78             fast: FastSlots::default(),
79             helping: HelpingSlots::default(),
80             in_use: AtomicUsize::new(NODE_USED),
81             next: ptr::null(),
82             active_writers: AtomicUsize::new(0),
83         }
84     }
85 }
86 
87 impl Node {
88     /// Goes through the debt linked list.
89     ///
90     /// This traverses the linked list, calling the closure on each node. If the closure returns
91     /// `Some`, it terminates with that value early, otherwise it runs to the end.
traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R>92     pub(crate) fn traverse<R, F: FnMut(&'static Node) -> Option<R>>(mut f: F) -> Option<R> {
93         // Acquire ‒ we want to make sure we read the correct version of data at the end of the
94         // pointer. Any write to the DEBT_HEAD is with Release.
95         //
96         // Furthermore, we need to see the newest version of the list in case we examine the debts
97         // - if a new one is added recently, we don't want a stale read -> SeqCst.
98         //
99         // Note that the other pointers in the chain never change and are *ordinary* pointers. The
100         // whole linked list is synchronized through the head.
101         let mut current = unsafe { LIST_HEAD.load(SeqCst).as_ref() };
102         while let Some(node) = current {
103             let result = f(node);
104             if result.is_some() {
105                 return result;
106             }
107             current = unsafe { node.next.as_ref() };
108         }
109         None
110     }
111 
112     /// Put the current thread node into cooldown
start_cooldown(&self)113     fn start_cooldown(&self) {
114         // Trick: Make sure we have an up to date value of the active_writers in this thread, so we
115         // can properly release it below.
116         let _reservation = self.reserve_writer();
117         assert_eq!(NODE_USED, self.in_use.swap(NODE_COOLDOWN, Release));
118     }
119 
120     /// Perform a cooldown if the node is ready.
121     ///
122     /// See the ABA protection at the [helping].
check_cooldown(&self)123     fn check_cooldown(&self) {
124         // Check if the node is in cooldown, for two reasons:
125         // * Skip most of nodes fast, without dealing with them.
126         // * More importantly, sync the value of active_writers to be at least the value when the
127         //   cooldown started. That way we know the 0 we observe happened some time after
128         //   start_cooldown.
129         if self.in_use.load(Acquire) == NODE_COOLDOWN {
130             // The rest can be nicely relaxed ‒ no memory is being synchronized by these
131             // operations. We just see an up to date 0 and allow someone (possibly us) to claim the
132             // node later on.
133             if self.active_writers.load(Relaxed) == 0 {
134                 let _ = self
135                     .in_use
136                     .compare_exchange(NODE_COOLDOWN, NODE_UNUSED, Relaxed, Relaxed);
137             }
138         }
139     }
140 
141     /// Mark this node that a writer is currently playing with it.
reserve_writer(&self) -> NodeReservation142     pub fn reserve_writer(&self) -> NodeReservation {
143         self.active_writers.fetch_add(1, Acquire);
144         NodeReservation(self)
145     }
146 
147     /// "Allocate" a node.
148     ///
149     /// Either a new one is created, or previous one is reused. The node is claimed to become
150     /// in_use.
get() -> &'static Self151     fn get() -> &'static Self {
152         // Try to find an unused one in the chain and reuse it.
153         Self::traverse(|node| {
154             node.check_cooldown();
155             if node
156                 .in_use
157                 // We claim a unique control over the generation and the right to write to slots if
158                 // they are NO_DEPT
159                 .compare_exchange(NODE_UNUSED, NODE_USED, SeqCst, Relaxed)
160                 .is_ok()
161             {
162                 Some(node)
163             } else {
164                 None
165             }
166         })
167         // If that didn't work, create a new one and prepend to the list.
168         .unwrap_or_else(|| {
169             let node = Box::leak(Box::<Node>::default());
170             node.helping.init();
171             // We don't want to read any data in addition to the head, Relaxed is fine
172             // here.
173             //
174             // We do need to release the data to others, but for that, we acquire in the
175             // compare_exchange below.
176             let mut head = LIST_HEAD.load(Relaxed);
177             loop {
178                 node.next = head;
179                 if let Err(old) = LIST_HEAD.compare_exchange_weak(
180                     head, node,
181                     // We need to release *the whole chain* here. For that, we need to
182                     // acquire it first.
183                     //
184                     // SeqCst because we need to make sure it is properly set "before" we do
185                     // anything to the debts.
186                     SeqCst, Relaxed, // Nothing changed, go next round of the loop.
187                 ) {
188                     head = old;
189                 } else {
190                     return node;
191                 }
192             }
193         })
194     }
195 
196     /// Iterate over the fast slots.
fast_slots(&self) -> Iter<Debt>197     pub(crate) fn fast_slots(&self) -> Iter<Debt> {
198         self.fast.into_iter()
199     }
200 
201     /// Access the helping slot.
helping_slot(&self) -> &Debt202     pub(crate) fn helping_slot(&self) -> &Debt {
203         self.helping.slot()
204     }
205 }
206 
207 /// A wrapper around a node pointer, to un-claim the node on thread shutdown.
208 pub(crate) struct LocalNode {
209     /// Node for this thread, if any.
210     ///
211     /// We don't necessarily have to own one, but if we don't, we'll get one before the first use.
212     node: Cell<Option<&'static Node>>,
213 
214     /// Thread-local data for the fast slots.
215     fast: FastLocal,
216 
217     /// Thread local data for the helping strategy.
218     helping: HelpingLocal,
219 }
220 
221 impl LocalNode {
222     #[cfg(not(feature = "experimental-thread-local"))]
with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R223     pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R {
224         let f = Cell::new(Some(f));
225         THREAD_HEAD
226             .try_with(|head| {
227                 if head.node.get().is_none() {
228                     head.node.set(Some(Node::get()));
229                 }
230                 let f = f.take().unwrap();
231                 f(head)
232             })
233             // During the application shutdown, the thread local storage may be already
234             // deallocated. In that case, the above fails but we still need something. So we just
235             // find or allocate a node and use it just once.
236             //
237             // Note that the situation should be very very rare and not happen often, so the slower
238             // performance doesn't matter that much.
239             .unwrap_or_else(|_| {
240                 let tmp_node = LocalNode {
241                     node: Cell::new(Some(Node::get())),
242                     fast: FastLocal::default(),
243                     helping: HelpingLocal::default(),
244                 };
245                 let f = f.take().unwrap();
246                 f(&tmp_node)
247                 // Drop of tmp_node -> sends the node we just used into cooldown.
248             })
249     }
250 
251     #[cfg(feature = "experimental-thread-local")]
with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R252     pub(crate) fn with<R, F: FnOnce(&LocalNode) -> R>(f: F) -> R {
253         let thread_head = THREAD_HEAD.get_or_init(|| LocalNode {
254             node: Cell::new(None),
255             fast: FastLocal::default(),
256             helping: HelpingLocal::default(),
257         });
258         if thread_head.node.get().is_none() {
259             thread_head.node.set(Some(Node::get()));
260         }
261         f(&thread_head)
262     }
263 
264     /// Creates a new debt.
265     ///
266     /// This stores the debt of the given pointer (untyped, casted into an usize) and returns a
267     /// reference to that slot, or gives up with `None` if all the slots are currently full.
268     #[inline]
new_fast(&self, ptr: usize) -> Option<&'static Debt>269     pub(crate) fn new_fast(&self, ptr: usize) -> Option<&'static Debt> {
270         let node = &self.node.get().expect("LocalNode::with ensures it is set");
271         debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
272         node.fast.get_debt(ptr, &self.fast)
273     }
274 
275     /// Initializes a helping slot transaction.
276     ///
277     /// Returns the generation (with tag).
new_helping(&self, ptr: usize) -> usize278     pub(crate) fn new_helping(&self, ptr: usize) -> usize {
279         let node = &self.node.get().expect("LocalNode::with ensures it is set");
280         debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
281         let (gen, discard) = node.helping.get_debt(ptr, &self.helping);
282         if discard {
283             // Too many generations happened, make sure the writers give the poor node a break for
284             // a while so they don't observe the generation wrapping around.
285             node.start_cooldown();
286             self.node.take();
287         }
288         gen
289     }
290 
291     /// Confirm the helping transaction.
292     ///
293     /// The generation comes from previous new_helping.
294     ///
295     /// Will either return a debt with the pointer, or a debt to pay and a replacement (already
296     /// protected) address.
confirm_helping( &self, gen: usize, ptr: usize, ) -> Result<&'static Debt, (&'static Debt, usize)>297     pub(crate) fn confirm_helping(
298         &self,
299         gen: usize,
300         ptr: usize,
301     ) -> Result<&'static Debt, (&'static Debt, usize)> {
302         let node = &self.node.get().expect("LocalNode::with ensures it is set");
303         debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
304         let slot = node.helping_slot();
305         node.helping
306             .confirm(gen, ptr)
307             .map(|()| slot)
308             .map_err(|repl| (slot, repl))
309     }
310 
311     /// The writer side of a helping slot.
312     ///
313     /// This potentially helps the `who` node (uses self as the local node, which must be
314     /// different) by loading the address that one is trying to load.
help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R) where T: RefCnt, R: Fn() -> T,315     pub(super) fn help<R, T>(&self, who: &Node, storage_addr: usize, replacement: &R)
316     where
317         T: RefCnt,
318         R: Fn() -> T,
319     {
320         let node = &self.node.get().expect("LocalNode::with ensures it is set");
321         debug_assert_eq!(node.in_use.load(Relaxed), NODE_USED);
322         node.helping.help(&who.helping, storage_addr, replacement)
323     }
324 }
325 
326 impl Drop for LocalNode {
drop(&mut self)327     fn drop(&mut self) {
328         if let Some(node) = self.node.get() {
329             // Release - syncing writes/ownership of this Node
330             node.start_cooldown();
331         }
332     }
333 }
334 
335 #[cfg(not(feature = "experimental-thread-local"))]
336 thread_local! {
337     /// A debt node assigned to this thread.
338     static THREAD_HEAD: LocalNode = LocalNode {
339         node: Cell::new(None),
340         fast: FastLocal::default(),
341         helping: HelpingLocal::default(),
342     };
343 }
344 
345 #[cfg(feature = "experimental-thread-local")]
346 #[thread_local]
347 /// A debt node assigned to this thread.
348 static THREAD_HEAD: OnceCell<LocalNode> = OnceCell::new();
349 
350 #[cfg(test)]
351 mod tests {
352     use super::*;
353 
354     impl Node {
is_empty(&self) -> bool355         fn is_empty(&self) -> bool {
356             self.fast_slots()
357                 .chain(core::iter::once(self.helping_slot()))
358                 .all(|d| d.0.load(Relaxed) == Debt::NONE)
359         }
360 
get_thread() -> &'static Self361         fn get_thread() -> &'static Self {
362             LocalNode::with(|h| h.node.get().unwrap())
363         }
364     }
365 
366     /// A freshly acquired thread local node is empty.
367     #[test]
new_empty()368     fn new_empty() {
369         assert!(Node::get_thread().is_empty());
370     }
371 }
372