• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! The global data and participant for garbage collection.
2 //!
3 //! # Registration
4 //!
5 //! In order to track all participants in one place, we need some form of participant
6 //! registration. When a participant is created, it is registered to a global lock-free
7 //! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
8 //! list.
9 //!
10 //! # Pinning
11 //!
12 //! Every participant contains an integer that tells whether the participant is pinned and if so,
13 //! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
14 //! aids in periodic global epoch advancement.
15 //!
16 //! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
17 //! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
18 //!
19 //! # Thread-local bag
20 //!
21 //! Objects that get unlinked from concurrent data structures must be stashed away until the global
22 //! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
23 //! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
24 //! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
25 //! for amortizing the synchronization cost of pushing the garbages to a global queue.
26 //!
27 //! # Global queue
28 //!
29 //! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
30 //! destroyed along the way. This design reduces contention on data structures. The global queue
31 //! cannot be explicitly accessed: the only way to interact with it is by calling functions
32 //! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers
33 //! garbage collection.
34 //!
35 //! Ideally each instance of concurrent data structure may have its own queue that gets fully
36 //! destroyed as soon as the data structure gets dropped.
37 
38 use crate::primitive::cell::UnsafeCell;
39 use crate::primitive::sync::atomic;
40 use core::cell::Cell;
41 use core::mem::{self, ManuallyDrop};
42 use core::num::Wrapping;
43 use core::sync::atomic::Ordering;
44 use core::{fmt, ptr};
45 
46 use crossbeam_utils::CachePadded;
47 use memoffset::offset_of;
48 
49 use crate::atomic::{Owned, Shared};
50 use crate::collector::{Collector, LocalHandle};
51 use crate::deferred::Deferred;
52 use crate::epoch::{AtomicEpoch, Epoch};
53 use crate::guard::{unprotected, Guard};
54 use crate::sync::list::{Entry, IsElement, IterError, List};
55 use crate::sync::queue::Queue;
56 
57 /// Maximum number of objects a bag can contain.
58 #[cfg(not(any(crossbeam_sanitize, miri)))]
59 const MAX_OBJECTS: usize = 64;
60 // Makes it more likely to trigger any potential data races.
61 #[cfg(any(crossbeam_sanitize, miri))]
62 const MAX_OBJECTS: usize = 4;
63 
64 /// A bag of deferred functions.
65 pub(crate) struct Bag {
66     /// Stashed objects.
67     deferreds: [Deferred; MAX_OBJECTS],
68     len: usize,
69 }
70 
71 /// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
72 unsafe impl Send for Bag {}
73 
74 impl Bag {
75     /// Returns a new, empty bag.
new() -> Self76     pub(crate) fn new() -> Self {
77         Self::default()
78     }
79 
80     /// Returns `true` if the bag is empty.
is_empty(&self) -> bool81     pub(crate) fn is_empty(&self) -> bool {
82         self.len == 0
83     }
84 
85     /// Attempts to insert a deferred function into the bag.
86     ///
87     /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
88     /// full.
89     ///
90     /// # Safety
91     ///
92     /// It should be safe for another thread to execute the given function.
try_push(&mut self, deferred: Deferred) -> Result<(), Deferred>93     pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
94         if self.len < MAX_OBJECTS {
95             self.deferreds[self.len] = deferred;
96             self.len += 1;
97             Ok(())
98         } else {
99             Err(deferred)
100         }
101     }
102 
103     /// Seals the bag with the given epoch.
seal(self, epoch: Epoch) -> SealedBag104     fn seal(self, epoch: Epoch) -> SealedBag {
105         SealedBag { epoch, _bag: self }
106     }
107 }
108 
109 impl Default for Bag {
default() -> Self110     fn default() -> Self {
111         Bag {
112             len: 0,
113             deferreds: [Deferred::NO_OP; MAX_OBJECTS],
114         }
115     }
116 }
117 
118 impl Drop for Bag {
drop(&mut self)119     fn drop(&mut self) {
120         // Call all deferred functions.
121         for deferred in &mut self.deferreds[..self.len] {
122             let no_op = Deferred::NO_OP;
123             let owned_deferred = mem::replace(deferred, no_op);
124             owned_deferred.call();
125         }
126     }
127 }
128 
129 // can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long
130 impl fmt::Debug for Bag {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result131     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
132         f.debug_struct("Bag")
133             .field("deferreds", &&self.deferreds[..self.len])
134             .finish()
135     }
136 }
137 
138 /// A pair of an epoch and a bag.
139 #[derive(Default, Debug)]
140 struct SealedBag {
141     epoch: Epoch,
142     _bag: Bag,
143 }
144 
145 /// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
146 unsafe impl Sync for SealedBag {}
147 
148 impl SealedBag {
149     /// Checks if it is safe to drop the bag w.r.t. the given global epoch.
is_expired(&self, global_epoch: Epoch) -> bool150     fn is_expired(&self, global_epoch: Epoch) -> bool {
151         // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
152         // is within one epoch of the current one cannot be destroyed yet.
153         global_epoch.wrapping_sub(self.epoch) >= 2
154     }
155 }
156 
157 /// The global data for a garbage collector.
158 pub(crate) struct Global {
159     /// The intrusive linked list of `Local`s.
160     locals: List<Local>,
161 
162     /// The global queue of bags of deferred functions.
163     queue: Queue<SealedBag>,
164 
165     /// The global epoch.
166     pub(crate) epoch: CachePadded<AtomicEpoch>,
167 }
168 
169 impl Global {
170     /// Number of bags to destroy.
171     const COLLECT_STEPS: usize = 8;
172 
173     /// Creates a new global data for garbage collection.
174     #[inline]
new() -> Self175     pub(crate) fn new() -> Self {
176         Self {
177             locals: List::new(),
178             queue: Queue::new(),
179             epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
180         }
181     }
182 
183     /// Pushes the bag into the global queue and replaces the bag with a new empty bag.
push_bag(&self, bag: &mut Bag, guard: &Guard)184     pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
185         let bag = mem::replace(bag, Bag::new());
186 
187         atomic::fence(Ordering::SeqCst);
188 
189         let epoch = self.epoch.load(Ordering::Relaxed);
190         self.queue.push(bag.seal(epoch), guard);
191     }
192 
193     /// Collects several bags from the global queue and executes deferred functions in them.
194     ///
195     /// Note: This may itself produce garbage and in turn allocate new bags.
196     ///
197     /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
198     /// path. In other words, we want the compiler to optimize branching for the case when
199     /// `collect()` is not called.
200     #[cold]
collect(&self, guard: &Guard)201     pub(crate) fn collect(&self, guard: &Guard) {
202         let global_epoch = self.try_advance(guard);
203 
204         let steps = if cfg!(crossbeam_sanitize) {
205             usize::max_value()
206         } else {
207             Self::COLLECT_STEPS
208         };
209 
210         for _ in 0..steps {
211             match self.queue.try_pop_if(
212                 &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
213                 guard,
214             ) {
215                 None => break,
216                 Some(sealed_bag) => drop(sealed_bag),
217             }
218         }
219     }
220 
221     /// Attempts to advance the global epoch.
222     ///
223     /// The global epoch can advance only if all currently pinned participants have been pinned in
224     /// the current epoch.
225     ///
226     /// Returns the current global epoch.
227     ///
228     /// `try_advance()` is annotated `#[cold]` because it is rarely called.
229     #[cold]
try_advance(&self, guard: &Guard) -> Epoch230     pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
231         let global_epoch = self.epoch.load(Ordering::Relaxed);
232         atomic::fence(Ordering::SeqCst);
233 
234         // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
235         // easy to implement in a lock-free manner. However, traversal can be slow due to cache
236         // misses and data dependencies. We should experiment with other data structures as well.
237         for local in self.locals.iter(guard) {
238             match local {
239                 Err(IterError::Stalled) => {
240                     // A concurrent thread stalled this iteration. That thread might also try to
241                     // advance the epoch, in which case we leave the job to it. Otherwise, the
242                     // epoch will not be advanced.
243                     return global_epoch;
244                 }
245                 Ok(local) => {
246                     let local_epoch = local.epoch.load(Ordering::Relaxed);
247 
248                     // If the participant was pinned in a different epoch, we cannot advance the
249                     // global epoch just yet.
250                     if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
251                         return global_epoch;
252                     }
253                 }
254             }
255         }
256         atomic::fence(Ordering::Acquire);
257 
258         // All pinned participants were pinned in the current global epoch.
259         // Now let's advance the global epoch...
260         //
261         // Note that if another thread already advanced it before us, this store will simply
262         // overwrite the global epoch with the same value. This is true because `try_advance` was
263         // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
264         // advanced two steps ahead of it.
265         let new_epoch = global_epoch.successor();
266         self.epoch.store(new_epoch, Ordering::Release);
267         new_epoch
268     }
269 }
270 
271 /// Participant for garbage collection.
272 pub(crate) struct Local {
273     /// A node in the intrusive linked list of `Local`s.
274     entry: Entry,
275 
276     /// The local epoch.
277     epoch: AtomicEpoch,
278 
279     /// A reference to the global data.
280     ///
281     /// When all guards and handles get dropped, this reference is destroyed.
282     collector: UnsafeCell<ManuallyDrop<Collector>>,
283 
284     /// The local bag of deferred functions.
285     pub(crate) bag: UnsafeCell<Bag>,
286 
287     /// The number of guards keeping this participant pinned.
288     guard_count: Cell<usize>,
289 
290     /// The number of active handles.
291     handle_count: Cell<usize>,
292 
293     /// Total number of pinnings performed.
294     ///
295     /// This is just an auxiliary counter that sometimes kicks off collection.
296     pin_count: Cell<Wrapping<usize>>,
297 }
298 
299 // Make sure `Local` is less than or equal to 2048 bytes.
300 // https://github.com/crossbeam-rs/crossbeam/issues/551
301 #[cfg(not(any(crossbeam_sanitize, miri)))] // `crossbeam_sanitize` and `miri` reduce the size of `Local`
302 #[test]
local_size()303 fn local_size() {
304     // TODO: https://github.com/crossbeam-rs/crossbeam/issues/869
305     // assert!(
306     //     core::mem::size_of::<Local>() <= 2048,
307     //     "An allocation of `Local` should be <= 2048 bytes."
308     // );
309 }
310 
311 impl Local {
312     /// Number of pinnings after which a participant will execute some deferred functions from the
313     /// global queue.
314     const PINNINGS_BETWEEN_COLLECT: usize = 128;
315 
316     /// Registers a new `Local` in the provided `Global`.
register(collector: &Collector) -> LocalHandle317     pub(crate) fn register(collector: &Collector) -> LocalHandle {
318         unsafe {
319             // Since we dereference no pointers in this block, it is safe to use `unprotected`.
320 
321             let local = Owned::new(Local {
322                 entry: Entry::default(),
323                 epoch: AtomicEpoch::new(Epoch::starting()),
324                 collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
325                 bag: UnsafeCell::new(Bag::new()),
326                 guard_count: Cell::new(0),
327                 handle_count: Cell::new(1),
328                 pin_count: Cell::new(Wrapping(0)),
329             })
330             .into_shared(unprotected());
331             collector.global.locals.insert(local, unprotected());
332             LocalHandle {
333                 local: local.as_raw(),
334             }
335         }
336     }
337 
338     /// Returns a reference to the `Global` in which this `Local` resides.
339     #[inline]
global(&self) -> &Global340     pub(crate) fn global(&self) -> &Global {
341         &self.collector().global
342     }
343 
344     /// Returns a reference to the `Collector` in which this `Local` resides.
345     #[inline]
collector(&self) -> &Collector346     pub(crate) fn collector(&self) -> &Collector {
347         self.collector.with(|c| unsafe { &**c })
348     }
349 
350     /// Returns `true` if the current participant is pinned.
351     #[inline]
is_pinned(&self) -> bool352     pub(crate) fn is_pinned(&self) -> bool {
353         self.guard_count.get() > 0
354     }
355 
356     /// Adds `deferred` to the thread-local bag.
357     ///
358     /// # Safety
359     ///
360     /// It should be safe for another thread to execute the given function.
defer(&self, mut deferred: Deferred, guard: &Guard)361     pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
362         let bag = self.bag.with_mut(|b| &mut *b);
363 
364         while let Err(d) = bag.try_push(deferred) {
365             self.global().push_bag(bag, guard);
366             deferred = d;
367         }
368     }
369 
flush(&self, guard: &Guard)370     pub(crate) fn flush(&self, guard: &Guard) {
371         let bag = self.bag.with_mut(|b| unsafe { &mut *b });
372 
373         if !bag.is_empty() {
374             self.global().push_bag(bag, guard);
375         }
376 
377         self.global().collect(guard);
378     }
379 
380     /// Pins the `Local`.
381     #[inline]
pin(&self) -> Guard382     pub(crate) fn pin(&self) -> Guard {
383         let guard = Guard { local: self };
384 
385         let guard_count = self.guard_count.get();
386         self.guard_count.set(guard_count.checked_add(1).unwrap());
387 
388         if guard_count == 0 {
389             let global_epoch = self.global().epoch.load(Ordering::Relaxed);
390             let new_epoch = global_epoch.pinned();
391 
392             // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
393             // The fence makes sure that any future loads from `Atomic`s will not happen before
394             // this store.
395             if cfg!(all(
396                 any(target_arch = "x86", target_arch = "x86_64"),
397                 not(miri)
398             )) {
399                 // HACK(stjepang): On x86 architectures there are two different ways of executing
400                 // a `SeqCst` fence.
401                 //
402                 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
403                 // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
404                 //    instruction.
405                 //
406                 // Both instructions have the effect of a full barrier, but benchmarks have shown
407                 // that the second one makes pinning faster in this particular case.  It is not
408                 // clear that this is permitted by the C++ memory model (SC fences work very
409                 // differently from SC accesses), but experimental evidence suggests that this
410                 // works fine.  Using inline assembly would be a viable (and correct) alternative,
411                 // but alas, that is not possible on stable Rust.
412                 let current = Epoch::starting();
413                 let res = self.epoch.compare_exchange(
414                     current,
415                     new_epoch,
416                     Ordering::SeqCst,
417                     Ordering::SeqCst,
418                 );
419                 debug_assert!(res.is_ok(), "participant was expected to be unpinned");
420                 // We add a compiler fence to make it less likely for LLVM to do something wrong
421                 // here.  Formally, this is not enough to get rid of data races; practically,
422                 // it should go a long way.
423                 atomic::compiler_fence(Ordering::SeqCst);
424             } else {
425                 self.epoch.store(new_epoch, Ordering::Relaxed);
426                 atomic::fence(Ordering::SeqCst);
427             }
428 
429             // Increment the pin counter.
430             let count = self.pin_count.get();
431             self.pin_count.set(count + Wrapping(1));
432 
433             // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
434             // some garbage.
435             if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
436                 self.global().collect(&guard);
437             }
438         }
439 
440         guard
441     }
442 
443     /// Unpins the `Local`.
444     #[inline]
unpin(&self)445     pub(crate) fn unpin(&self) {
446         let guard_count = self.guard_count.get();
447         self.guard_count.set(guard_count - 1);
448 
449         if guard_count == 1 {
450             self.epoch.store(Epoch::starting(), Ordering::Release);
451 
452             if self.handle_count.get() == 0 {
453                 self.finalize();
454             }
455         }
456     }
457 
458     /// Unpins and then pins the `Local`.
459     #[inline]
repin(&self)460     pub(crate) fn repin(&self) {
461         let guard_count = self.guard_count.get();
462 
463         // Update the local epoch only if there's only one guard.
464         if guard_count == 1 {
465             let epoch = self.epoch.load(Ordering::Relaxed);
466             let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
467 
468             // Update the local epoch only if the global epoch is greater than the local epoch.
469             if epoch != global_epoch {
470                 // We store the new epoch with `Release` because we need to ensure any memory
471                 // accesses from the previous epoch do not leak into the new one.
472                 self.epoch.store(global_epoch, Ordering::Release);
473 
474                 // However, we don't need a following `SeqCst` fence, because it is safe for memory
475                 // accesses from the new epoch to be executed before updating the local epoch. At
476                 // worse, other threads will see the new epoch late and delay GC slightly.
477             }
478         }
479     }
480 
481     /// Increments the handle count.
482     #[inline]
acquire_handle(&self)483     pub(crate) fn acquire_handle(&self) {
484         let handle_count = self.handle_count.get();
485         debug_assert!(handle_count >= 1);
486         self.handle_count.set(handle_count + 1);
487     }
488 
489     /// Decrements the handle count.
490     #[inline]
release_handle(&self)491     pub(crate) fn release_handle(&self) {
492         let guard_count = self.guard_count.get();
493         let handle_count = self.handle_count.get();
494         debug_assert!(handle_count >= 1);
495         self.handle_count.set(handle_count - 1);
496 
497         if guard_count == 0 && handle_count == 1 {
498             self.finalize();
499         }
500     }
501 
502     /// Removes the `Local` from the global linked list.
503     #[cold]
finalize(&self)504     fn finalize(&self) {
505         debug_assert_eq!(self.guard_count.get(), 0);
506         debug_assert_eq!(self.handle_count.get(), 0);
507 
508         // Temporarily increment handle count. This is required so that the following call to `pin`
509         // doesn't call `finalize` again.
510         self.handle_count.set(1);
511         unsafe {
512             // Pin and move the local bag into the global queue. It's important that `push_bag`
513             // doesn't defer destruction on any new garbage.
514             let guard = &self.pin();
515             self.global()
516                 .push_bag(self.bag.with_mut(|b| &mut *b), guard);
517         }
518         // Revert the handle count back to zero.
519         self.handle_count.set(0);
520 
521         unsafe {
522             // Take the reference to the `Global` out of this `Local`. Since we're not protected
523             // by a guard at this time, it's crucial that the reference is read before marking the
524             // `Local` as deleted.
525             let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
526 
527             // Mark this node in the linked list as deleted.
528             self.entry.delete(unprotected());
529 
530             // Finally, drop the reference to the global. Note that this might be the last reference
531             // to the `Global`. If so, the global data will be destroyed and all deferred functions
532             // in its queue will be executed.
533             drop(collector);
534         }
535     }
536 }
537 
538 impl IsElement<Local> for Local {
entry_of(local: &Local) -> &Entry539     fn entry_of(local: &Local) -> &Entry {
540         let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
541         unsafe { &*entry_ptr }
542     }
543 
element_of(entry: &Entry) -> &Local544     unsafe fn element_of(entry: &Entry) -> &Local {
545         // offset_of! macro uses unsafe, but it's unnecessary in this context.
546         #[allow(unused_unsafe)]
547         let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
548         &*local_ptr
549     }
550 
finalize(entry: &Entry, guard: &Guard)551     unsafe fn finalize(entry: &Entry, guard: &Guard) {
552         guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
553     }
554 }
555 
556 #[cfg(all(test, not(crossbeam_loom)))]
557 mod tests {
558     use std::sync::atomic::{AtomicUsize, Ordering};
559 
560     use super::*;
561 
562     #[test]
check_defer()563     fn check_defer() {
564         static FLAG: AtomicUsize = AtomicUsize::new(0);
565         fn set() {
566             FLAG.store(42, Ordering::Relaxed);
567         }
568 
569         let d = Deferred::new(set);
570         assert_eq!(FLAG.load(Ordering::Relaxed), 0);
571         d.call();
572         assert_eq!(FLAG.load(Ordering::Relaxed), 42);
573     }
574 
575     #[test]
check_bag()576     fn check_bag() {
577         static FLAG: AtomicUsize = AtomicUsize::new(0);
578         fn incr() {
579             FLAG.fetch_add(1, Ordering::Relaxed);
580         }
581 
582         let mut bag = Bag::new();
583         assert!(bag.is_empty());
584 
585         for _ in 0..MAX_OBJECTS {
586             assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
587             assert!(!bag.is_empty());
588             assert_eq!(FLAG.load(Ordering::Relaxed), 0);
589         }
590 
591         let result = unsafe { bag.try_push(Deferred::new(incr)) };
592         assert!(result.is_err());
593         assert!(!bag.is_empty());
594         assert_eq!(FLAG.load(Ordering::Relaxed), 0);
595 
596         drop(bag);
597         assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
598     }
599 }
600