• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! The global data and participant for garbage collection.
2 //!
3 //! # Registration
4 //!
5 //! In order to track all participants in one place, we need some form of participant
6 //! registration. When a participant is created, it is registered to a global lock-free
7 //! singly-linked list of registries; and when a participant is leaving, it is unregistered from the
8 //! list.
9 //!
10 //! # Pinning
11 //!
12 //! Every participant contains an integer that tells whether the participant is pinned and if so,
13 //! what was the global epoch at the time it was pinned. Participants also hold a pin counter that
14 //! aids in periodic global epoch advancement.
15 //!
16 //! When a participant is pinned, a `Guard` is returned as a witness that the participant is pinned.
17 //! Guards are necessary for performing atomic operations, and for freeing/dropping locations.
18 //!
19 //! # Thread-local bag
20 //!
21 //! Objects that get unlinked from concurrent data structures must be stashed away until the global
22 //! epoch sufficiently advances so that they become safe for destruction. Pointers to such objects
23 //! are pushed into a thread-local bag, and when it becomes full, the bag is marked with the current
24 //! global epoch and pushed into the global queue of bags. We store objects in thread-local storages
25 //! for amortizing the synchronization cost of pushing the garbages to a global queue.
26 //!
27 //! # Global queue
28 //!
29 //! Whenever a bag is pushed into a queue, the objects in some bags in the queue are collected and
30 //! destroyed along the way. This design reduces contention on data structures. The global queue
31 //! cannot be explicitly accessed: the only way to interact with it is by calling functions
32 //! `defer()` that adds an object to the thread-local bag, or `collect()` that manually triggers
33 //! garbage collection.
34 //!
35 //! Ideally each instance of concurrent data structure may have its own queue that gets fully
36 //! destroyed as soon as the data structure gets dropped.
37 
38 use crate::primitive::cell::UnsafeCell;
39 use crate::primitive::sync::atomic;
40 use core::cell::Cell;
41 use core::mem::{self, ManuallyDrop};
42 use core::num::Wrapping;
43 use core::sync::atomic::Ordering;
44 use core::{fmt, ptr};
45 
46 use crossbeam_utils::CachePadded;
47 use memoffset::offset_of;
48 
49 use crate::atomic::{Owned, Shared};
50 use crate::collector::{Collector, LocalHandle};
51 use crate::deferred::Deferred;
52 use crate::epoch::{AtomicEpoch, Epoch};
53 use crate::guard::{unprotected, Guard};
54 use crate::sync::list::{Entry, IsElement, IterError, List};
55 use crate::sync::queue::Queue;
56 
57 /// Maximum number of objects a bag can contain.
58 #[cfg(not(crossbeam_sanitize))]
59 const MAX_OBJECTS: usize = 62;
60 #[cfg(crossbeam_sanitize)]
61 const MAX_OBJECTS: usize = 4;
62 
63 /// A bag of deferred functions.
64 pub(crate) struct Bag {
65     /// Stashed objects.
66     deferreds: [Deferred; MAX_OBJECTS],
67     len: usize,
68 }
69 
70 /// `Bag::try_push()` requires that it is safe for another thread to execute the given functions.
71 unsafe impl Send for Bag {}
72 
73 impl Bag {
74     /// Returns a new, empty bag.
new() -> Self75     pub(crate) fn new() -> Self {
76         Self::default()
77     }
78 
79     /// Returns `true` if the bag is empty.
is_empty(&self) -> bool80     pub(crate) fn is_empty(&self) -> bool {
81         self.len == 0
82     }
83 
84     /// Attempts to insert a deferred function into the bag.
85     ///
86     /// Returns `Ok(())` if successful, and `Err(deferred)` for the given `deferred` if the bag is
87     /// full.
88     ///
89     /// # Safety
90     ///
91     /// It should be safe for another thread to execute the given function.
try_push(&mut self, deferred: Deferred) -> Result<(), Deferred>92     pub(crate) unsafe fn try_push(&mut self, deferred: Deferred) -> Result<(), Deferred> {
93         if self.len < MAX_OBJECTS {
94             self.deferreds[self.len] = deferred;
95             self.len += 1;
96             Ok(())
97         } else {
98             Err(deferred)
99         }
100     }
101 
102     /// Seals the bag with the given epoch.
seal(self, epoch: Epoch) -> SealedBag103     fn seal(self, epoch: Epoch) -> SealedBag {
104         SealedBag { epoch, _bag: self }
105     }
106 }
107 
108 impl Default for Bag {
109     #[rustfmt::skip]
default() -> Self110     fn default() -> Self {
111         // TODO: [no_op; MAX_OBJECTS] syntax blocked by https://github.com/rust-lang/rust/issues/49147
112         #[cfg(not(crossbeam_sanitize))]
113         return Bag {
114             len: 0,
115             deferreds: [
116                 Deferred::new(no_op_func),
117                 Deferred::new(no_op_func),
118                 Deferred::new(no_op_func),
119                 Deferred::new(no_op_func),
120                 Deferred::new(no_op_func),
121                 Deferred::new(no_op_func),
122                 Deferred::new(no_op_func),
123                 Deferred::new(no_op_func),
124                 Deferred::new(no_op_func),
125                 Deferred::new(no_op_func),
126                 Deferred::new(no_op_func),
127                 Deferred::new(no_op_func),
128                 Deferred::new(no_op_func),
129                 Deferred::new(no_op_func),
130                 Deferred::new(no_op_func),
131                 Deferred::new(no_op_func),
132                 Deferred::new(no_op_func),
133                 Deferred::new(no_op_func),
134                 Deferred::new(no_op_func),
135                 Deferred::new(no_op_func),
136                 Deferred::new(no_op_func),
137                 Deferred::new(no_op_func),
138                 Deferred::new(no_op_func),
139                 Deferred::new(no_op_func),
140                 Deferred::new(no_op_func),
141                 Deferred::new(no_op_func),
142                 Deferred::new(no_op_func),
143                 Deferred::new(no_op_func),
144                 Deferred::new(no_op_func),
145                 Deferred::new(no_op_func),
146                 Deferred::new(no_op_func),
147                 Deferred::new(no_op_func),
148                 Deferred::new(no_op_func),
149                 Deferred::new(no_op_func),
150                 Deferred::new(no_op_func),
151                 Deferred::new(no_op_func),
152                 Deferred::new(no_op_func),
153                 Deferred::new(no_op_func),
154                 Deferred::new(no_op_func),
155                 Deferred::new(no_op_func),
156                 Deferred::new(no_op_func),
157                 Deferred::new(no_op_func),
158                 Deferred::new(no_op_func),
159                 Deferred::new(no_op_func),
160                 Deferred::new(no_op_func),
161                 Deferred::new(no_op_func),
162                 Deferred::new(no_op_func),
163                 Deferred::new(no_op_func),
164                 Deferred::new(no_op_func),
165                 Deferred::new(no_op_func),
166                 Deferred::new(no_op_func),
167                 Deferred::new(no_op_func),
168                 Deferred::new(no_op_func),
169                 Deferred::new(no_op_func),
170                 Deferred::new(no_op_func),
171                 Deferred::new(no_op_func),
172                 Deferred::new(no_op_func),
173                 Deferred::new(no_op_func),
174                 Deferred::new(no_op_func),
175                 Deferred::new(no_op_func),
176                 Deferred::new(no_op_func),
177                 Deferred::new(no_op_func),
178             ],
179         };
180         #[cfg(crossbeam_sanitize)]
181         return Bag {
182             len: 0,
183             deferreds: [
184                 Deferred::new(no_op_func),
185                 Deferred::new(no_op_func),
186                 Deferred::new(no_op_func),
187                 Deferred::new(no_op_func),
188             ],
189         };
190     }
191 }
192 
193 impl Drop for Bag {
drop(&mut self)194     fn drop(&mut self) {
195         // Call all deferred functions.
196         for deferred in &mut self.deferreds[..self.len] {
197             let no_op = Deferred::new(no_op_func);
198             let owned_deferred = mem::replace(deferred, no_op);
199             owned_deferred.call();
200         }
201     }
202 }
203 
204 // can't #[derive(Debug)] because Debug is not implemented for arrays 64 items long
205 impl fmt::Debug for Bag {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result206     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207         f.debug_struct("Bag")
208             .field("deferreds", &&self.deferreds[..self.len])
209             .finish()
210     }
211 }
212 
no_op_func()213 fn no_op_func() {}
214 
215 /// A pair of an epoch and a bag.
216 #[derive(Default, Debug)]
217 struct SealedBag {
218     epoch: Epoch,
219     _bag: Bag,
220 }
221 
222 /// It is safe to share `SealedBag` because `is_expired` only inspects the epoch.
223 unsafe impl Sync for SealedBag {}
224 
225 impl SealedBag {
226     /// Checks if it is safe to drop the bag w.r.t. the given global epoch.
is_expired(&self, global_epoch: Epoch) -> bool227     fn is_expired(&self, global_epoch: Epoch) -> bool {
228         // A pinned participant can witness at most one epoch advancement. Therefore, any bag that
229         // is within one epoch of the current one cannot be destroyed yet.
230         global_epoch.wrapping_sub(self.epoch) >= 2
231     }
232 }
233 
234 /// The global data for a garbage collector.
235 pub(crate) struct Global {
236     /// The intrusive linked list of `Local`s.
237     locals: List<Local>,
238 
239     /// The global queue of bags of deferred functions.
240     queue: Queue<SealedBag>,
241 
242     /// The global epoch.
243     pub(crate) epoch: CachePadded<AtomicEpoch>,
244 }
245 
246 impl Global {
247     /// Number of bags to destroy.
248     const COLLECT_STEPS: usize = 8;
249 
250     /// Creates a new global data for garbage collection.
251     #[inline]
new() -> Self252     pub(crate) fn new() -> Self {
253         Self {
254             locals: List::new(),
255             queue: Queue::new(),
256             epoch: CachePadded::new(AtomicEpoch::new(Epoch::starting())),
257         }
258     }
259 
260     /// Pushes the bag into the global queue and replaces the bag with a new empty bag.
push_bag(&self, bag: &mut Bag, guard: &Guard)261     pub(crate) fn push_bag(&self, bag: &mut Bag, guard: &Guard) {
262         let bag = mem::replace(bag, Bag::new());
263 
264         atomic::fence(Ordering::SeqCst);
265 
266         let epoch = self.epoch.load(Ordering::Relaxed);
267         self.queue.push(bag.seal(epoch), guard);
268     }
269 
270     /// Collects several bags from the global queue and executes deferred functions in them.
271     ///
272     /// Note: This may itself produce garbage and in turn allocate new bags.
273     ///
274     /// `pin()` rarely calls `collect()`, so we want the compiler to place that call on a cold
275     /// path. In other words, we want the compiler to optimize branching for the case when
276     /// `collect()` is not called.
277     #[cold]
collect(&self, guard: &Guard)278     pub(crate) fn collect(&self, guard: &Guard) {
279         let global_epoch = self.try_advance(guard);
280 
281         let steps = if cfg!(crossbeam_sanitize) {
282             usize::max_value()
283         } else {
284             Self::COLLECT_STEPS
285         };
286 
287         for _ in 0..steps {
288             match self.queue.try_pop_if(
289                 &|sealed_bag: &SealedBag| sealed_bag.is_expired(global_epoch),
290                 guard,
291             ) {
292                 None => break,
293                 Some(sealed_bag) => drop(sealed_bag),
294             }
295         }
296     }
297 
298     /// Attempts to advance the global epoch.
299     ///
300     /// The global epoch can advance only if all currently pinned participants have been pinned in
301     /// the current epoch.
302     ///
303     /// Returns the current global epoch.
304     ///
305     /// `try_advance()` is annotated `#[cold]` because it is rarely called.
306     #[cold]
try_advance(&self, guard: &Guard) -> Epoch307     pub(crate) fn try_advance(&self, guard: &Guard) -> Epoch {
308         let global_epoch = self.epoch.load(Ordering::Relaxed);
309         atomic::fence(Ordering::SeqCst);
310 
311         // TODO(stjepang): `Local`s are stored in a linked list because linked lists are fairly
312         // easy to implement in a lock-free manner. However, traversal can be slow due to cache
313         // misses and data dependencies. We should experiment with other data structures as well.
314         for local in self.locals.iter(guard) {
315             match local {
316                 Err(IterError::Stalled) => {
317                     // A concurrent thread stalled this iteration. That thread might also try to
318                     // advance the epoch, in which case we leave the job to it. Otherwise, the
319                     // epoch will not be advanced.
320                     return global_epoch;
321                 }
322                 Ok(local) => {
323                     let local_epoch = local.epoch.load(Ordering::Relaxed);
324 
325                     // If the participant was pinned in a different epoch, we cannot advance the
326                     // global epoch just yet.
327                     if local_epoch.is_pinned() && local_epoch.unpinned() != global_epoch {
328                         return global_epoch;
329                     }
330                 }
331             }
332         }
333         atomic::fence(Ordering::Acquire);
334 
335         // All pinned participants were pinned in the current global epoch.
336         // Now let's advance the global epoch...
337         //
338         // Note that if another thread already advanced it before us, this store will simply
339         // overwrite the global epoch with the same value. This is true because `try_advance` was
340         // called from a thread that was pinned in `global_epoch`, and the global epoch cannot be
341         // advanced two steps ahead of it.
342         let new_epoch = global_epoch.successor();
343         self.epoch.store(new_epoch, Ordering::Release);
344         new_epoch
345     }
346 }
347 
348 /// Participant for garbage collection.
349 pub(crate) struct Local {
350     /// A node in the intrusive linked list of `Local`s.
351     entry: Entry,
352 
353     /// The local epoch.
354     epoch: AtomicEpoch,
355 
356     /// A reference to the global data.
357     ///
358     /// When all guards and handles get dropped, this reference is destroyed.
359     collector: UnsafeCell<ManuallyDrop<Collector>>,
360 
361     /// The local bag of deferred functions.
362     pub(crate) bag: UnsafeCell<Bag>,
363 
364     /// The number of guards keeping this participant pinned.
365     guard_count: Cell<usize>,
366 
367     /// The number of active handles.
368     handle_count: Cell<usize>,
369 
370     /// Total number of pinnings performed.
371     ///
372     /// This is just an auxiliary counter that sometimes kicks off collection.
373     pin_count: Cell<Wrapping<usize>>,
374 }
375 
376 // Make sure `Local` is less than or equal to 2048 bytes.
377 // https://github.com/crossbeam-rs/crossbeam/issues/551
378 #[cfg(not(crossbeam_sanitize))] // `crossbeam_sanitize` reduces the size of `Local`
379 #[test]
local_size()380 fn local_size() {
381     assert!(
382         core::mem::size_of::<Local>() <= 2048,
383         "An allocation of `Local` should be <= 2048 bytes."
384     );
385 }
386 
387 impl Local {
388     /// Number of pinnings after which a participant will execute some deferred functions from the
389     /// global queue.
390     const PINNINGS_BETWEEN_COLLECT: usize = 128;
391 
392     /// Registers a new `Local` in the provided `Global`.
register(collector: &Collector) -> LocalHandle393     pub(crate) fn register(collector: &Collector) -> LocalHandle {
394         unsafe {
395             // Since we dereference no pointers in this block, it is safe to use `unprotected`.
396 
397             let local = Owned::new(Local {
398                 entry: Entry::default(),
399                 epoch: AtomicEpoch::new(Epoch::starting()),
400                 collector: UnsafeCell::new(ManuallyDrop::new(collector.clone())),
401                 bag: UnsafeCell::new(Bag::new()),
402                 guard_count: Cell::new(0),
403                 handle_count: Cell::new(1),
404                 pin_count: Cell::new(Wrapping(0)),
405             })
406             .into_shared(unprotected());
407             collector.global.locals.insert(local, unprotected());
408             LocalHandle {
409                 local: local.as_raw(),
410             }
411         }
412     }
413 
414     /// Returns a reference to the `Global` in which this `Local` resides.
415     #[inline]
global(&self) -> &Global416     pub(crate) fn global(&self) -> &Global {
417         &self.collector().global
418     }
419 
420     /// Returns a reference to the `Collector` in which this `Local` resides.
421     #[inline]
collector(&self) -> &Collector422     pub(crate) fn collector(&self) -> &Collector {
423         self.collector.with(|c| unsafe { &**c })
424     }
425 
426     /// Returns `true` if the current participant is pinned.
427     #[inline]
is_pinned(&self) -> bool428     pub(crate) fn is_pinned(&self) -> bool {
429         self.guard_count.get() > 0
430     }
431 
432     /// Adds `deferred` to the thread-local bag.
433     ///
434     /// # Safety
435     ///
436     /// It should be safe for another thread to execute the given function.
defer(&self, mut deferred: Deferred, guard: &Guard)437     pub(crate) unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
438         let bag = self.bag.with_mut(|b| &mut *b);
439 
440         while let Err(d) = bag.try_push(deferred) {
441             self.global().push_bag(bag, guard);
442             deferred = d;
443         }
444     }
445 
flush(&self, guard: &Guard)446     pub(crate) fn flush(&self, guard: &Guard) {
447         let bag = self.bag.with_mut(|b| unsafe { &mut *b });
448 
449         if !bag.is_empty() {
450             self.global().push_bag(bag, guard);
451         }
452 
453         self.global().collect(guard);
454     }
455 
456     /// Pins the `Local`.
457     #[inline]
pin(&self) -> Guard458     pub(crate) fn pin(&self) -> Guard {
459         let guard = Guard { local: self };
460 
461         let guard_count = self.guard_count.get();
462         self.guard_count.set(guard_count.checked_add(1).unwrap());
463 
464         if guard_count == 0 {
465             let global_epoch = self.global().epoch.load(Ordering::Relaxed);
466             let new_epoch = global_epoch.pinned();
467 
468             // Now we must store `new_epoch` into `self.epoch` and execute a `SeqCst` fence.
469             // The fence makes sure that any future loads from `Atomic`s will not happen before
470             // this store.
471             if cfg!(any(target_arch = "x86", target_arch = "x86_64")) {
472                 // HACK(stjepang): On x86 architectures there are two different ways of executing
473                 // a `SeqCst` fence.
474                 //
475                 // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction.
476                 // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg`
477                 //    instruction.
478                 //
479                 // Both instructions have the effect of a full barrier, but benchmarks have shown
480                 // that the second one makes pinning faster in this particular case.  It is not
481                 // clear that this is permitted by the C++ memory model (SC fences work very
482                 // differently from SC accesses), but experimental evidence suggests that this
483                 // works fine.  Using inline assembly would be a viable (and correct) alternative,
484                 // but alas, that is not possible on stable Rust.
485                 let current = Epoch::starting();
486                 let res = self.epoch.compare_exchange(
487                     current,
488                     new_epoch,
489                     Ordering::SeqCst,
490                     Ordering::SeqCst,
491                 );
492                 debug_assert!(res.is_ok(), "participant was expected to be unpinned");
493                 // We add a compiler fence to make it less likely for LLVM to do something wrong
494                 // here.  Formally, this is not enough to get rid of data races; practically,
495                 // it should go a long way.
496                 atomic::compiler_fence(Ordering::SeqCst);
497             } else {
498                 self.epoch.store(new_epoch, Ordering::Relaxed);
499                 atomic::fence(Ordering::SeqCst);
500             }
501 
502             // Increment the pin counter.
503             let count = self.pin_count.get();
504             self.pin_count.set(count + Wrapping(1));
505 
506             // After every `PINNINGS_BETWEEN_COLLECT` try advancing the epoch and collecting
507             // some garbage.
508             if count.0 % Self::PINNINGS_BETWEEN_COLLECT == 0 {
509                 self.global().collect(&guard);
510             }
511         }
512 
513         guard
514     }
515 
516     /// Unpins the `Local`.
517     #[inline]
unpin(&self)518     pub(crate) fn unpin(&self) {
519         let guard_count = self.guard_count.get();
520         self.guard_count.set(guard_count - 1);
521 
522         if guard_count == 1 {
523             self.epoch.store(Epoch::starting(), Ordering::Release);
524 
525             if self.handle_count.get() == 0 {
526                 self.finalize();
527             }
528         }
529     }
530 
531     /// Unpins and then pins the `Local`.
532     #[inline]
repin(&self)533     pub(crate) fn repin(&self) {
534         let guard_count = self.guard_count.get();
535 
536         // Update the local epoch only if there's only one guard.
537         if guard_count == 1 {
538             let epoch = self.epoch.load(Ordering::Relaxed);
539             let global_epoch = self.global().epoch.load(Ordering::Relaxed).pinned();
540 
541             // Update the local epoch only if the global epoch is greater than the local epoch.
542             if epoch != global_epoch {
543                 // We store the new epoch with `Release` because we need to ensure any memory
544                 // accesses from the previous epoch do not leak into the new one.
545                 self.epoch.store(global_epoch, Ordering::Release);
546 
547                 // However, we don't need a following `SeqCst` fence, because it is safe for memory
548                 // accesses from the new epoch to be executed before updating the local epoch. At
549                 // worse, other threads will see the new epoch late and delay GC slightly.
550             }
551         }
552     }
553 
554     /// Increments the handle count.
555     #[inline]
acquire_handle(&self)556     pub(crate) fn acquire_handle(&self) {
557         let handle_count = self.handle_count.get();
558         debug_assert!(handle_count >= 1);
559         self.handle_count.set(handle_count + 1);
560     }
561 
562     /// Decrements the handle count.
563     #[inline]
release_handle(&self)564     pub(crate) fn release_handle(&self) {
565         let guard_count = self.guard_count.get();
566         let handle_count = self.handle_count.get();
567         debug_assert!(handle_count >= 1);
568         self.handle_count.set(handle_count - 1);
569 
570         if guard_count == 0 && handle_count == 1 {
571             self.finalize();
572         }
573     }
574 
575     /// Removes the `Local` from the global linked list.
576     #[cold]
finalize(&self)577     fn finalize(&self) {
578         debug_assert_eq!(self.guard_count.get(), 0);
579         debug_assert_eq!(self.handle_count.get(), 0);
580 
581         // Temporarily increment handle count. This is required so that the following call to `pin`
582         // doesn't call `finalize` again.
583         self.handle_count.set(1);
584         unsafe {
585             // Pin and move the local bag into the global queue. It's important that `push_bag`
586             // doesn't defer destruction on any new garbage.
587             let guard = &self.pin();
588             self.global()
589                 .push_bag(self.bag.with_mut(|b| &mut *b), guard);
590         }
591         // Revert the handle count back to zero.
592         self.handle_count.set(0);
593 
594         unsafe {
595             // Take the reference to the `Global` out of this `Local`. Since we're not protected
596             // by a guard at this time, it's crucial that the reference is read before marking the
597             // `Local` as deleted.
598             let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));
599 
600             // Mark this node in the linked list as deleted.
601             self.entry.delete(unprotected());
602 
603             // Finally, drop the reference to the global. Note that this might be the last reference
604             // to the `Global`. If so, the global data will be destroyed and all deferred functions
605             // in its queue will be executed.
606             drop(collector);
607         }
608     }
609 }
610 
611 impl IsElement<Local> for Local {
entry_of(local: &Local) -> &Entry612     fn entry_of(local: &Local) -> &Entry {
613         let entry_ptr = (local as *const Local as usize + offset_of!(Local, entry)) as *const Entry;
614         unsafe { &*entry_ptr }
615     }
616 
element_of(entry: &Entry) -> &Local617     unsafe fn element_of(entry: &Entry) -> &Local {
618         // offset_of! macro uses unsafe, but it's unnecessary in this context.
619         #[allow(unused_unsafe)]
620         let local_ptr = (entry as *const Entry as usize - offset_of!(Local, entry)) as *const Local;
621         &*local_ptr
622     }
623 
finalize(entry: &Entry, guard: &Guard)624     unsafe fn finalize(entry: &Entry, guard: &Guard) {
625         guard.defer_destroy(Shared::from(Self::element_of(entry) as *const _));
626     }
627 }
628 
629 #[cfg(all(test, not(crossbeam_loom)))]
630 mod tests {
631     use std::sync::atomic::{AtomicUsize, Ordering};
632 
633     use super::*;
634 
635     #[test]
check_defer()636     fn check_defer() {
637         static FLAG: AtomicUsize = AtomicUsize::new(0);
638         fn set() {
639             FLAG.store(42, Ordering::Relaxed);
640         }
641 
642         let d = Deferred::new(set);
643         assert_eq!(FLAG.load(Ordering::Relaxed), 0);
644         d.call();
645         assert_eq!(FLAG.load(Ordering::Relaxed), 42);
646     }
647 
648     #[test]
check_bag()649     fn check_bag() {
650         static FLAG: AtomicUsize = AtomicUsize::new(0);
651         fn incr() {
652             FLAG.fetch_add(1, Ordering::Relaxed);
653         }
654 
655         let mut bag = Bag::new();
656         assert!(bag.is_empty());
657 
658         for _ in 0..MAX_OBJECTS {
659             assert!(unsafe { bag.try_push(Deferred::new(incr)).is_ok() });
660             assert!(!bag.is_empty());
661             assert_eq!(FLAG.load(Ordering::Relaxed), 0);
662         }
663 
664         let result = unsafe { bag.try_push(Deferred::new(incr)) };
665         assert!(result.is_err());
666         assert!(!bag.is_empty());
667         assert_eq!(FLAG.load(Ordering::Relaxed), 0);
668 
669         drop(bag);
670         assert_eq!(FLAG.load(Ordering::Relaxed), MAX_OBJECTS);
671     }
672 }
673