1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 use crate::thread_parker::{ThreadParker, ThreadParkerT, UnparkHandleT};
8 use crate::util::UncheckedOptionExt;
9 use crate::word_lock::WordLock;
10 use core::{
11 cell::{Cell, UnsafeCell},
12 ptr,
13 sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
14 };
15 use smallvec::SmallVec;
16 use std::time::{Duration, Instant};
17
18 // Don't use Instant on wasm32-unknown-unknown, it just panics.
19 cfg_if::cfg_if! {
20 if #[cfg(all(
21 target_family = "wasm",
22 target_os = "unknown",
23 target_vendor = "unknown"
24 ))] {
25 #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
26 struct TimeoutInstant;
27 impl TimeoutInstant {
28 fn now() -> TimeoutInstant {
29 TimeoutInstant
30 }
31 }
32 impl core::ops::Add<Duration> for TimeoutInstant {
33 type Output = Self;
34 fn add(self, _rhs: Duration) -> Self::Output {
35 TimeoutInstant
36 }
37 }
38 } else {
39 use std::time::Instant as TimeoutInstant;
40 }
41 }
42
43 static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);
44
45 /// Holds the pointer to the currently active `HashTable`.
46 ///
47 /// # Safety
48 ///
49 /// Except for the initial value of null, it must always point to a valid `HashTable` instance.
50 /// Any `HashTable` this global static has ever pointed to must never be freed.
51 static HASHTABLE: AtomicPtr<HashTable> = AtomicPtr::new(ptr::null_mut());
52
53 // Even with 3x more buckets than threads, the memory overhead per thread is
54 // still only a few hundred bytes per thread.
55 const LOAD_FACTOR: usize = 3;
56
57 struct HashTable {
58 // Hash buckets for the table
59 entries: Box<[Bucket]>,
60
61 // Number of bits used for the hash function
62 hash_bits: u32,
63
64 // Previous table. This is only kept to keep leak detectors happy.
65 _prev: *const HashTable,
66 }
67
68 impl HashTable {
69 #[inline]
new(num_threads: usize, prev: *const HashTable) -> Box<HashTable>70 fn new(num_threads: usize, prev: *const HashTable) -> Box<HashTable> {
71 let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
72 let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;
73
74 let now = TimeoutInstant::now();
75 let mut entries = Vec::with_capacity(new_size);
76 for i in 0..new_size {
77 // We must ensure the seed is not zero
78 entries.push(Bucket::new(now, i as u32 + 1));
79 }
80
81 Box::new(HashTable {
82 entries: entries.into_boxed_slice(),
83 hash_bits,
84 _prev: prev,
85 })
86 }
87 }
88
89 #[repr(align(64))]
90 struct Bucket {
91 // Lock protecting the queue
92 mutex: WordLock,
93
94 // Linked list of threads waiting on this bucket
95 queue_head: Cell<*const ThreadData>,
96 queue_tail: Cell<*const ThreadData>,
97
98 // Next time at which point be_fair should be set
99 fair_timeout: UnsafeCell<FairTimeout>,
100 }
101
102 impl Bucket {
103 #[inline]
new(timeout: TimeoutInstant, seed: u32) -> Self104 pub fn new(timeout: TimeoutInstant, seed: u32) -> Self {
105 Self {
106 mutex: WordLock::new(),
107 queue_head: Cell::new(ptr::null()),
108 queue_tail: Cell::new(ptr::null()),
109 fair_timeout: UnsafeCell::new(FairTimeout::new(timeout, seed)),
110 }
111 }
112 }
113
114 struct FairTimeout {
115 // Next time at which point be_fair should be set
116 timeout: TimeoutInstant,
117
118 // the PRNG state for calculating the next timeout
119 seed: u32,
120 }
121
122 impl FairTimeout {
123 #[inline]
new(timeout: TimeoutInstant, seed: u32) -> FairTimeout124 fn new(timeout: TimeoutInstant, seed: u32) -> FairTimeout {
125 FairTimeout { timeout, seed }
126 }
127
128 // Determine whether we should force a fair unlock, and update the timeout
129 #[inline]
should_timeout(&mut self) -> bool130 fn should_timeout(&mut self) -> bool {
131 let now = TimeoutInstant::now();
132 if now > self.timeout {
133 // Time between 0 and 1ms.
134 let nanos = self.gen_u32() % 1_000_000;
135 self.timeout = now + Duration::new(0, nanos);
136 true
137 } else {
138 false
139 }
140 }
141
142 // Pseudorandom number generator from the "Xorshift RNGs" paper by George Marsaglia.
gen_u32(&mut self) -> u32143 fn gen_u32(&mut self) -> u32 {
144 self.seed ^= self.seed << 13;
145 self.seed ^= self.seed >> 17;
146 self.seed ^= self.seed << 5;
147 self.seed
148 }
149 }
150
151 struct ThreadData {
152 parker: ThreadParker,
153
154 // Key that this thread is sleeping on. This may change if the thread is
155 // requeued to a different key.
156 key: AtomicUsize,
157
158 // Linked list of parked threads in a bucket
159 next_in_queue: Cell<*const ThreadData>,
160
161 // UnparkToken passed to this thread when it is unparked
162 unpark_token: Cell<UnparkToken>,
163
164 // ParkToken value set by the thread when it was parked
165 park_token: Cell<ParkToken>,
166
167 // Is the thread parked with a timeout?
168 parked_with_timeout: Cell<bool>,
169
170 // Extra data for deadlock detection
171 #[cfg(feature = "deadlock_detection")]
172 deadlock_data: deadlock::DeadlockData,
173 }
174
175 impl ThreadData {
new() -> ThreadData176 fn new() -> ThreadData {
177 // Keep track of the total number of live ThreadData objects and resize
178 // the hash table accordingly.
179 let num_threads = NUM_THREADS.fetch_add(1, Ordering::Relaxed) + 1;
180 grow_hashtable(num_threads);
181
182 ThreadData {
183 parker: ThreadParker::new(),
184 key: AtomicUsize::new(0),
185 next_in_queue: Cell::new(ptr::null()),
186 unpark_token: Cell::new(DEFAULT_UNPARK_TOKEN),
187 park_token: Cell::new(DEFAULT_PARK_TOKEN),
188 parked_with_timeout: Cell::new(false),
189 #[cfg(feature = "deadlock_detection")]
190 deadlock_data: deadlock::DeadlockData::new(),
191 }
192 }
193 }
194
195 // Invokes the given closure with a reference to the current thread `ThreadData`.
196 #[inline(always)]
with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T197 fn with_thread_data<T>(f: impl FnOnce(&ThreadData) -> T) -> T {
198 // Unlike word_lock::ThreadData, parking_lot::ThreadData is always expensive
199 // to construct. Try to use a thread-local version if possible. Otherwise just
200 // create a ThreadData on the stack
201 let mut thread_data_storage = None;
202 thread_local!(static THREAD_DATA: ThreadData = ThreadData::new());
203 let thread_data_ptr = THREAD_DATA
204 .try_with(|x| x as *const ThreadData)
205 .unwrap_or_else(|_| thread_data_storage.get_or_insert_with(ThreadData::new));
206
207 f(unsafe { &*thread_data_ptr })
208 }
209
210 impl Drop for ThreadData {
drop(&mut self)211 fn drop(&mut self) {
212 NUM_THREADS.fetch_sub(1, Ordering::Relaxed);
213 }
214 }
215
216 /// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
217 /// The reference is valid forever. However, the `HashTable` it references might become stale
218 /// at any point. Meaning it still exists, but it is not the instance in active use.
219 #[inline]
get_hashtable() -> &'static HashTable220 fn get_hashtable() -> &'static HashTable {
221 let table = HASHTABLE.load(Ordering::Acquire);
222
223 // If there is no table, create one
224 if table.is_null() {
225 create_hashtable()
226 } else {
227 // SAFETY: when not null, `HASHTABLE` always points to a `HashTable` that is never freed.
228 unsafe { &*table }
229 }
230 }
231
232 /// Returns a reference to the latest hash table, creating one if it doesn't exist yet.
233 /// The reference is valid forever. However, the `HashTable` it references might become stale
234 /// at any point. Meaning it still exists, but it is not the instance in active use.
235 #[cold]
create_hashtable() -> &'static HashTable236 fn create_hashtable() -> &'static HashTable {
237 let new_table = Box::into_raw(HashTable::new(LOAD_FACTOR, ptr::null()));
238
239 // If this fails then it means some other thread created the hash table first.
240 let table = match HASHTABLE.compare_exchange(
241 ptr::null_mut(),
242 new_table,
243 Ordering::AcqRel,
244 Ordering::Acquire,
245 ) {
246 Ok(_) => new_table,
247 Err(old_table) => {
248 // Free the table we created
249 // SAFETY: `new_table` is created from `Box::into_raw` above and only freed here.
250 unsafe {
251 Box::from_raw(new_table);
252 }
253 old_table
254 }
255 };
256 // SAFETY: The `HashTable` behind `table` is never freed. It is either the table pointer we
257 // created here, or it is one loaded from `HASHTABLE`.
258 unsafe { &*table }
259 }
260
261 // Grow the hash table so that it is big enough for the given number of threads.
262 // This isn't performance-critical since it is only done when a ThreadData is
263 // created, which only happens once per thread.
grow_hashtable(num_threads: usize)264 fn grow_hashtable(num_threads: usize) {
265 // Lock all buckets in the existing table and get a reference to it
266 let old_table = loop {
267 let table = get_hashtable();
268
269 // Check if we need to resize the existing table
270 if table.entries.len() >= LOAD_FACTOR * num_threads {
271 return;
272 }
273
274 // Lock all buckets in the old table
275 for bucket in &table.entries[..] {
276 bucket.mutex.lock();
277 }
278
279 // Now check if our table is still the latest one. Another thread could
280 // have grown the hash table between us reading HASHTABLE and locking
281 // the buckets.
282 if HASHTABLE.load(Ordering::Relaxed) == table as *const _ as *mut _ {
283 break table;
284 }
285
286 // Unlock buckets and try again
287 for bucket in &table.entries[..] {
288 // SAFETY: We hold the lock here, as required
289 unsafe { bucket.mutex.unlock() };
290 }
291 };
292
293 // Create the new table
294 let mut new_table = HashTable::new(num_threads, old_table);
295
296 // Move the entries from the old table to the new one
297 for bucket in &old_table.entries[..] {
298 // SAFETY: The park, unpark* and check_wait_graph_fast functions create only correct linked
299 // lists. All `ThreadData` instances in these lists will remain valid as long as they are
300 // present in the lists, meaning as long as their threads are parked.
301 unsafe { rehash_bucket_into(bucket, &mut new_table) };
302 }
303
304 // Publish the new table. No races are possible at this point because
305 // any other thread trying to grow the hash table is blocked on the bucket
306 // locks in the old table.
307 HASHTABLE.store(Box::into_raw(new_table), Ordering::Release);
308
309 // Unlock all buckets in the old table
310 for bucket in &old_table.entries[..] {
311 // SAFETY: We hold the lock here, as required
312 unsafe { bucket.mutex.unlock() };
313 }
314 }
315
316 /// Iterate through all `ThreadData` objects in the bucket and insert them into the given table
317 /// in the bucket their key correspond to for this table.
318 ///
319 /// # Safety
320 ///
321 /// The given `bucket` must have a correctly constructed linked list under `queue_head`, containing
322 /// `ThreadData` instances that must stay valid at least as long as the given `table` is in use.
323 ///
324 /// The given `table` must only contain buckets with correctly constructed linked lists.
rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable)325 unsafe fn rehash_bucket_into(bucket: &'static Bucket, table: &mut HashTable) {
326 let mut current: *const ThreadData = bucket.queue_head.get();
327 while !current.is_null() {
328 let next = (*current).next_in_queue.get();
329 let hash = hash((*current).key.load(Ordering::Relaxed), table.hash_bits);
330 if table.entries[hash].queue_tail.get().is_null() {
331 table.entries[hash].queue_head.set(current);
332 } else {
333 (*table.entries[hash].queue_tail.get())
334 .next_in_queue
335 .set(current);
336 }
337 table.entries[hash].queue_tail.set(current);
338 (*current).next_in_queue.set(ptr::null());
339 current = next;
340 }
341 }
342
343 // Hash function for addresses
344 #[cfg(target_pointer_width = "32")]
345 #[inline]
hash(key: usize, bits: u32) -> usize346 fn hash(key: usize, bits: u32) -> usize {
347 key.wrapping_mul(0x9E3779B9) >> (32 - bits)
348 }
349 #[cfg(target_pointer_width = "64")]
350 #[inline]
hash(key: usize, bits: u32) -> usize351 fn hash(key: usize, bits: u32) -> usize {
352 key.wrapping_mul(0x9E3779B97F4A7C15) >> (64 - bits)
353 }
354
355 /// Locks the bucket for the given key and returns a reference to it.
356 /// The returned bucket must be unlocked again in order to not cause deadlocks.
357 #[inline]
lock_bucket(key: usize) -> &'static Bucket358 fn lock_bucket(key: usize) -> &'static Bucket {
359 loop {
360 let hashtable = get_hashtable();
361
362 let hash = hash(key, hashtable.hash_bits);
363 let bucket = &hashtable.entries[hash];
364
365 // Lock the bucket
366 bucket.mutex.lock();
367
368 // If no other thread has rehashed the table before we grabbed the lock
369 // then we are good to go! The lock we grabbed prevents any rehashes.
370 if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
371 return bucket;
372 }
373
374 // Unlock the bucket and try again
375 // SAFETY: We hold the lock here, as required
376 unsafe { bucket.mutex.unlock() };
377 }
378 }
379
380 /// Locks the bucket for the given key and returns a reference to it. But checks that the key
381 /// hasn't been changed in the meantime due to a requeue.
382 /// The returned bucket must be unlocked again in order to not cause deadlocks.
383 #[inline]
lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket)384 fn lock_bucket_checked(key: &AtomicUsize) -> (usize, &'static Bucket) {
385 loop {
386 let hashtable = get_hashtable();
387 let current_key = key.load(Ordering::Relaxed);
388
389 let hash = hash(current_key, hashtable.hash_bits);
390 let bucket = &hashtable.entries[hash];
391
392 // Lock the bucket
393 bucket.mutex.lock();
394
395 // Check that both the hash table and key are correct while the bucket
396 // is locked. Note that the key can't change once we locked the proper
397 // bucket for it, so we just keep trying until we have the correct key.
398 if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _
399 && key.load(Ordering::Relaxed) == current_key
400 {
401 return (current_key, bucket);
402 }
403
404 // Unlock the bucket and try again
405 // SAFETY: We hold the lock here, as required
406 unsafe { bucket.mutex.unlock() };
407 }
408 }
409
410 /// Locks the two buckets for the given pair of keys and returns references to them.
411 /// The returned buckets must be unlocked again in order to not cause deadlocks.
412 ///
413 /// If both keys hash to the same value, both returned references will be to the same bucket. Be
414 /// careful to only unlock it once in this case, always use `unlock_bucket_pair`.
415 #[inline]
lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket)416 fn lock_bucket_pair(key1: usize, key2: usize) -> (&'static Bucket, &'static Bucket) {
417 loop {
418 let hashtable = get_hashtable();
419
420 let hash1 = hash(key1, hashtable.hash_bits);
421 let hash2 = hash(key2, hashtable.hash_bits);
422
423 // Get the bucket at the lowest hash/index first
424 let bucket1 = if hash1 <= hash2 {
425 &hashtable.entries[hash1]
426 } else {
427 &hashtable.entries[hash2]
428 };
429
430 // Lock the first bucket
431 bucket1.mutex.lock();
432
433 // If no other thread has rehashed the table before we grabbed the lock
434 // then we are good to go! The lock we grabbed prevents any rehashes.
435 if HASHTABLE.load(Ordering::Relaxed) == hashtable as *const _ as *mut _ {
436 // Now lock the second bucket and return the two buckets
437 if hash1 == hash2 {
438 return (bucket1, bucket1);
439 } else if hash1 < hash2 {
440 let bucket2 = &hashtable.entries[hash2];
441 bucket2.mutex.lock();
442 return (bucket1, bucket2);
443 } else {
444 let bucket2 = &hashtable.entries[hash1];
445 bucket2.mutex.lock();
446 return (bucket2, bucket1);
447 }
448 }
449
450 // Unlock the bucket and try again
451 // SAFETY: We hold the lock here, as required
452 unsafe { bucket1.mutex.unlock() };
453 }
454 }
455
456 /// Unlock a pair of buckets
457 ///
458 /// # Safety
459 ///
460 /// Both buckets must be locked
461 #[inline]
unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket)462 unsafe fn unlock_bucket_pair(bucket1: &Bucket, bucket2: &Bucket) {
463 bucket1.mutex.unlock();
464 if !ptr::eq(bucket1, bucket2) {
465 bucket2.mutex.unlock();
466 }
467 }
468
469 /// Result of a park operation.
470 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
471 pub enum ParkResult {
472 /// We were unparked by another thread with the given token.
473 Unparked(UnparkToken),
474
475 /// The validation callback returned false.
476 Invalid,
477
478 /// The timeout expired.
479 TimedOut,
480 }
481
482 impl ParkResult {
483 /// Returns true if we were unparked by another thread.
484 #[inline]
is_unparked(self) -> bool485 pub fn is_unparked(self) -> bool {
486 if let ParkResult::Unparked(_) = self {
487 true
488 } else {
489 false
490 }
491 }
492 }
493
494 /// Result of an unpark operation.
495 #[derive(Copy, Clone, Default, Eq, PartialEq, Debug)]
496 pub struct UnparkResult {
497 /// The number of threads that were unparked.
498 pub unparked_threads: usize,
499
500 /// The number of threads that were requeued.
501 pub requeued_threads: usize,
502
503 /// Whether there are any threads remaining in the queue. This only returns
504 /// true if a thread was unparked.
505 pub have_more_threads: bool,
506
507 /// This is set to true on average once every 0.5ms for any given key. It
508 /// should be used to switch to a fair unlocking mechanism for a particular
509 /// unlock.
510 pub be_fair: bool,
511
512 /// Private field so new fields can be added without breakage.
513 _sealed: (),
514 }
515
516 /// Operation that `unpark_requeue` should perform.
517 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
518 pub enum RequeueOp {
519 /// Abort the operation without doing anything.
520 Abort,
521
522 /// Unpark one thread and requeue the rest onto the target queue.
523 UnparkOneRequeueRest,
524
525 /// Requeue all threads onto the target queue.
526 RequeueAll,
527
528 /// Unpark one thread and leave the rest parked. No requeuing is done.
529 UnparkOne,
530
531 /// Requeue one thread and leave the rest parked on the original queue.
532 RequeueOne,
533 }
534
535 /// Operation that `unpark_filter` should perform for each thread.
536 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
537 pub enum FilterOp {
538 /// Unpark the thread and continue scanning the list of parked threads.
539 Unpark,
540
541 /// Don't unpark the thread and continue scanning the list of parked threads.
542 Skip,
543
544 /// Don't unpark the thread and stop scanning the list of parked threads.
545 Stop,
546 }
547
548 /// A value which is passed from an unparker to a parked thread.
549 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
550 pub struct UnparkToken(pub usize);
551
552 /// A value associated with a parked thread which can be used by `unpark_filter`.
553 #[derive(Copy, Clone, Eq, PartialEq, Debug)]
554 pub struct ParkToken(pub usize);
555
556 /// A default unpark token to use.
557 pub const DEFAULT_UNPARK_TOKEN: UnparkToken = UnparkToken(0);
558
559 /// A default park token to use.
560 pub const DEFAULT_PARK_TOKEN: ParkToken = ParkToken(0);
561
562 /// Parks the current thread in the queue associated with the given key.
563 ///
564 /// The `validate` function is called while the queue is locked and can abort
565 /// the operation by returning false. If `validate` returns true then the
566 /// current thread is appended to the queue and the queue is unlocked.
567 ///
568 /// The `before_sleep` function is called after the queue is unlocked but before
569 /// the thread is put to sleep. The thread will then sleep until it is unparked
570 /// or the given timeout is reached.
571 ///
572 /// The `timed_out` function is also called while the queue is locked, but only
573 /// if the timeout was reached. It is passed the key of the queue it was in when
574 /// it timed out, which may be different from the original key if
575 /// `unpark_requeue` was called. It is also passed a bool which indicates
576 /// whether it was the last thread in the queue.
577 ///
578 /// # Safety
579 ///
580 /// You should only call this function with an address that you control, since
581 /// you could otherwise interfere with the operation of other synchronization
582 /// primitives.
583 ///
584 /// The `validate` and `timed_out` functions are called while the queue is
585 /// locked and must not panic or call into any function in `parking_lot`.
586 ///
587 /// The `before_sleep` function is called outside the queue lock and is allowed
588 /// to call `unpark_one`, `unpark_all`, `unpark_requeue` or `unpark_filter`, but
589 /// it is not allowed to call `park` or panic.
590 #[inline]
park( key: usize, validate: impl FnOnce() -> bool, before_sleep: impl FnOnce(), timed_out: impl FnOnce(usize, bool), park_token: ParkToken, timeout: Option<Instant>, ) -> ParkResult591 pub unsafe fn park(
592 key: usize,
593 validate: impl FnOnce() -> bool,
594 before_sleep: impl FnOnce(),
595 timed_out: impl FnOnce(usize, bool),
596 park_token: ParkToken,
597 timeout: Option<Instant>,
598 ) -> ParkResult {
599 // Grab our thread data, this also ensures that the hash table exists
600 with_thread_data(|thread_data| {
601 // Lock the bucket for the given key
602 let bucket = lock_bucket(key);
603
604 // If the validation function fails, just return
605 if !validate() {
606 // SAFETY: We hold the lock here, as required
607 bucket.mutex.unlock();
608 return ParkResult::Invalid;
609 }
610
611 // Append our thread data to the queue and unlock the bucket
612 thread_data.parked_with_timeout.set(timeout.is_some());
613 thread_data.next_in_queue.set(ptr::null());
614 thread_data.key.store(key, Ordering::Relaxed);
615 thread_data.park_token.set(park_token);
616 thread_data.parker.prepare_park();
617 if !bucket.queue_head.get().is_null() {
618 (*bucket.queue_tail.get()).next_in_queue.set(thread_data);
619 } else {
620 bucket.queue_head.set(thread_data);
621 }
622 bucket.queue_tail.set(thread_data);
623 // SAFETY: We hold the lock here, as required
624 bucket.mutex.unlock();
625
626 // Invoke the pre-sleep callback
627 before_sleep();
628
629 // Park our thread and determine whether we were woken up by an unpark
630 // or by our timeout. Note that this isn't precise: we can still be
631 // unparked since we are still in the queue.
632 let unparked = match timeout {
633 Some(timeout) => thread_data.parker.park_until(timeout),
634 None => {
635 thread_data.parker.park();
636 // call deadlock detection on_unpark hook
637 deadlock::on_unpark(thread_data);
638 true
639 }
640 };
641
642 // If we were unparked, return now
643 if unparked {
644 return ParkResult::Unparked(thread_data.unpark_token.get());
645 }
646
647 // Lock our bucket again. Note that the hashtable may have been rehashed in
648 // the meantime. Our key may also have changed if we were requeued.
649 let (key, bucket) = lock_bucket_checked(&thread_data.key);
650
651 // Now we need to check again if we were unparked or timed out. Unlike the
652 // last check this is precise because we hold the bucket lock.
653 if !thread_data.parker.timed_out() {
654 // SAFETY: We hold the lock here, as required
655 bucket.mutex.unlock();
656 return ParkResult::Unparked(thread_data.unpark_token.get());
657 }
658
659 // We timed out, so we now need to remove our thread from the queue
660 let mut link = &bucket.queue_head;
661 let mut current = bucket.queue_head.get();
662 let mut previous = ptr::null();
663 let mut was_last_thread = true;
664 while !current.is_null() {
665 if current == thread_data {
666 let next = (*current).next_in_queue.get();
667 link.set(next);
668 if bucket.queue_tail.get() == current {
669 bucket.queue_tail.set(previous);
670 } else {
671 // Scan the rest of the queue to see if there are any other
672 // entries with the given key.
673 let mut scan = next;
674 while !scan.is_null() {
675 if (*scan).key.load(Ordering::Relaxed) == key {
676 was_last_thread = false;
677 break;
678 }
679 scan = (*scan).next_in_queue.get();
680 }
681 }
682
683 // Callback to indicate that we timed out, and whether we were the
684 // last thread on the queue.
685 timed_out(key, was_last_thread);
686 break;
687 } else {
688 if (*current).key.load(Ordering::Relaxed) == key {
689 was_last_thread = false;
690 }
691 link = &(*current).next_in_queue;
692 previous = current;
693 current = link.get();
694 }
695 }
696
697 // There should be no way for our thread to have been removed from the queue
698 // if we timed out.
699 debug_assert!(!current.is_null());
700
701 // Unlock the bucket, we are done
702 // SAFETY: We hold the lock here, as required
703 bucket.mutex.unlock();
704 ParkResult::TimedOut
705 })
706 }
707
708 /// Unparks one thread from the queue associated with the given key.
709 ///
710 /// The `callback` function is called while the queue is locked and before the
711 /// target thread is woken up. The `UnparkResult` argument to the function
712 /// indicates whether a thread was found in the queue and whether this was the
713 /// last thread in the queue. This value is also returned by `unpark_one`.
714 ///
715 /// The `callback` function should return an `UnparkToken` value which will be
716 /// passed to the thread that is unparked. If no thread is unparked then the
717 /// returned value is ignored.
718 ///
719 /// # Safety
720 ///
721 /// You should only call this function with an address that you control, since
722 /// you could otherwise interfere with the operation of other synchronization
723 /// primitives.
724 ///
725 /// The `callback` function is called while the queue is locked and must not
726 /// panic or call into any function in `parking_lot`.
727 #[inline]
unpark_one( key: usize, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult728 pub unsafe fn unpark_one(
729 key: usize,
730 callback: impl FnOnce(UnparkResult) -> UnparkToken,
731 ) -> UnparkResult {
732 // Lock the bucket for the given key
733 let bucket = lock_bucket(key);
734
735 // Find a thread with a matching key and remove it from the queue
736 let mut link = &bucket.queue_head;
737 let mut current = bucket.queue_head.get();
738 let mut previous = ptr::null();
739 let mut result = UnparkResult::default();
740 while !current.is_null() {
741 if (*current).key.load(Ordering::Relaxed) == key {
742 // Remove the thread from the queue
743 let next = (*current).next_in_queue.get();
744 link.set(next);
745 if bucket.queue_tail.get() == current {
746 bucket.queue_tail.set(previous);
747 } else {
748 // Scan the rest of the queue to see if there are any other
749 // entries with the given key.
750 let mut scan = next;
751 while !scan.is_null() {
752 if (*scan).key.load(Ordering::Relaxed) == key {
753 result.have_more_threads = true;
754 break;
755 }
756 scan = (*scan).next_in_queue.get();
757 }
758 }
759
760 // Invoke the callback before waking up the thread
761 result.unparked_threads = 1;
762 result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
763 let token = callback(result);
764
765 // Set the token for the target thread
766 (*current).unpark_token.set(token);
767
768 // This is a bit tricky: we first lock the ThreadParker to prevent
769 // the thread from exiting and freeing its ThreadData if its wait
770 // times out. Then we unlock the queue since we don't want to keep
771 // the queue locked while we perform a system call. Finally we wake
772 // up the parked thread.
773 let handle = (*current).parker.unpark_lock();
774 // SAFETY: We hold the lock here, as required
775 bucket.mutex.unlock();
776 handle.unpark();
777
778 return result;
779 } else {
780 link = &(*current).next_in_queue;
781 previous = current;
782 current = link.get();
783 }
784 }
785
786 // No threads with a matching key were found in the bucket
787 callback(result);
788 // SAFETY: We hold the lock here, as required
789 bucket.mutex.unlock();
790 result
791 }
792
793 /// Unparks all threads in the queue associated with the given key.
794 ///
795 /// The given `UnparkToken` is passed to all unparked threads.
796 ///
797 /// This function returns the number of threads that were unparked.
798 ///
799 /// # Safety
800 ///
801 /// You should only call this function with an address that you control, since
802 /// you could otherwise interfere with the operation of other synchronization
803 /// primitives.
804 #[inline]
unpark_all(key: usize, unpark_token: UnparkToken) -> usize805 pub unsafe fn unpark_all(key: usize, unpark_token: UnparkToken) -> usize {
806 // Lock the bucket for the given key
807 let bucket = lock_bucket(key);
808
809 // Remove all threads with the given key in the bucket
810 let mut link = &bucket.queue_head;
811 let mut current = bucket.queue_head.get();
812 let mut previous = ptr::null();
813 let mut threads = SmallVec::<[_; 8]>::new();
814 while !current.is_null() {
815 if (*current).key.load(Ordering::Relaxed) == key {
816 // Remove the thread from the queue
817 let next = (*current).next_in_queue.get();
818 link.set(next);
819 if bucket.queue_tail.get() == current {
820 bucket.queue_tail.set(previous);
821 }
822
823 // Set the token for the target thread
824 (*current).unpark_token.set(unpark_token);
825
826 // Don't wake up threads while holding the queue lock. See comment
827 // in unpark_one. For now just record which threads we need to wake
828 // up.
829 threads.push((*current).parker.unpark_lock());
830 current = next;
831 } else {
832 link = &(*current).next_in_queue;
833 previous = current;
834 current = link.get();
835 }
836 }
837
838 // Unlock the bucket
839 // SAFETY: We hold the lock here, as required
840 bucket.mutex.unlock();
841
842 // Now that we are outside the lock, wake up all the threads that we removed
843 // from the queue.
844 let num_threads = threads.len();
845 for handle in threads.into_iter() {
846 handle.unpark();
847 }
848
849 num_threads
850 }
851
852 /// Removes all threads from the queue associated with `key_from`, optionally
853 /// unparks the first one and requeues the rest onto the queue associated with
854 /// `key_to`.
855 ///
856 /// The `validate` function is called while both queues are locked. Its return
857 /// value will determine which operation is performed, or whether the operation
858 /// should be aborted. See `RequeueOp` for details about the different possible
859 /// return values.
860 ///
861 /// The `callback` function is also called while both queues are locked. It is
862 /// passed the `RequeueOp` returned by `validate` and an `UnparkResult`
863 /// indicating whether a thread was unparked and whether there are threads still
864 /// parked in the new queue. This `UnparkResult` value is also returned by
865 /// `unpark_requeue`.
866 ///
867 /// The `callback` function should return an `UnparkToken` value which will be
868 /// passed to the thread that is unparked. If no thread is unparked then the
869 /// returned value is ignored.
870 ///
871 /// # Safety
872 ///
873 /// You should only call this function with an address that you control, since
874 /// you could otherwise interfere with the operation of other synchronization
875 /// primitives.
876 ///
877 /// The `validate` and `callback` functions are called while the queue is locked
878 /// and must not panic or call into any function in `parking_lot`.
879 #[inline]
unpark_requeue( key_from: usize, key_to: usize, validate: impl FnOnce() -> RequeueOp, callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken, ) -> UnparkResult880 pub unsafe fn unpark_requeue(
881 key_from: usize,
882 key_to: usize,
883 validate: impl FnOnce() -> RequeueOp,
884 callback: impl FnOnce(RequeueOp, UnparkResult) -> UnparkToken,
885 ) -> UnparkResult {
886 // Lock the two buckets for the given key
887 let (bucket_from, bucket_to) = lock_bucket_pair(key_from, key_to);
888
889 // If the validation function fails, just return
890 let mut result = UnparkResult::default();
891 let op = validate();
892 if op == RequeueOp::Abort {
893 // SAFETY: Both buckets are locked, as required.
894 unlock_bucket_pair(bucket_from, bucket_to);
895 return result;
896 }
897
898 // Remove all threads with the given key in the source bucket
899 let mut link = &bucket_from.queue_head;
900 let mut current = bucket_from.queue_head.get();
901 let mut previous = ptr::null();
902 let mut requeue_threads: *const ThreadData = ptr::null();
903 let mut requeue_threads_tail: *const ThreadData = ptr::null();
904 let mut wakeup_thread = None;
905 while !current.is_null() {
906 if (*current).key.load(Ordering::Relaxed) == key_from {
907 // Remove the thread from the queue
908 let next = (*current).next_in_queue.get();
909 link.set(next);
910 if bucket_from.queue_tail.get() == current {
911 bucket_from.queue_tail.set(previous);
912 }
913
914 // Prepare the first thread for wakeup and requeue the rest.
915 if (op == RequeueOp::UnparkOneRequeueRest || op == RequeueOp::UnparkOne)
916 && wakeup_thread.is_none()
917 {
918 wakeup_thread = Some(current);
919 result.unparked_threads = 1;
920 } else {
921 if !requeue_threads.is_null() {
922 (*requeue_threads_tail).next_in_queue.set(current);
923 } else {
924 requeue_threads = current;
925 }
926 requeue_threads_tail = current;
927 (*current).key.store(key_to, Ordering::Relaxed);
928 result.requeued_threads += 1;
929 }
930 if op == RequeueOp::UnparkOne || op == RequeueOp::RequeueOne {
931 // Scan the rest of the queue to see if there are any other
932 // entries with the given key.
933 let mut scan = next;
934 while !scan.is_null() {
935 if (*scan).key.load(Ordering::Relaxed) == key_from {
936 result.have_more_threads = true;
937 break;
938 }
939 scan = (*scan).next_in_queue.get();
940 }
941 break;
942 }
943 current = next;
944 } else {
945 link = &(*current).next_in_queue;
946 previous = current;
947 current = link.get();
948 }
949 }
950
951 // Add the requeued threads to the destination bucket
952 if !requeue_threads.is_null() {
953 (*requeue_threads_tail).next_in_queue.set(ptr::null());
954 if !bucket_to.queue_head.get().is_null() {
955 (*bucket_to.queue_tail.get())
956 .next_in_queue
957 .set(requeue_threads);
958 } else {
959 bucket_to.queue_head.set(requeue_threads);
960 }
961 bucket_to.queue_tail.set(requeue_threads_tail);
962 }
963
964 // Invoke the callback before waking up the thread
965 if result.unparked_threads != 0 {
966 result.be_fair = (*bucket_from.fair_timeout.get()).should_timeout();
967 }
968 let token = callback(op, result);
969
970 // See comment in unpark_one for why we mess with the locking
971 if let Some(wakeup_thread) = wakeup_thread {
972 (*wakeup_thread).unpark_token.set(token);
973 let handle = (*wakeup_thread).parker.unpark_lock();
974 // SAFETY: Both buckets are locked, as required.
975 unlock_bucket_pair(bucket_from, bucket_to);
976 handle.unpark();
977 } else {
978 // SAFETY: Both buckets are locked, as required.
979 unlock_bucket_pair(bucket_from, bucket_to);
980 }
981
982 result
983 }
984
985 /// Unparks a number of threads from the front of the queue associated with
986 /// `key` depending on the results of a filter function which inspects the
987 /// `ParkToken` associated with each thread.
988 ///
989 /// The `filter` function is called for each thread in the queue or until
990 /// `FilterOp::Stop` is returned. This function is passed the `ParkToken`
991 /// associated with a particular thread, which is unparked if `FilterOp::Unpark`
992 /// is returned.
993 ///
994 /// The `callback` function is also called while both queues are locked. It is
995 /// passed an `UnparkResult` indicating the number of threads that were unparked
996 /// and whether there are still parked threads in the queue. This `UnparkResult`
997 /// value is also returned by `unpark_filter`.
998 ///
999 /// The `callback` function should return an `UnparkToken` value which will be
1000 /// passed to all threads that are unparked. If no thread is unparked then the
1001 /// returned value is ignored.
1002 ///
1003 /// # Safety
1004 ///
1005 /// You should only call this function with an address that you control, since
1006 /// you could otherwise interfere with the operation of other synchronization
1007 /// primitives.
1008 ///
1009 /// The `filter` and `callback` functions are called while the queue is locked
1010 /// and must not panic or call into any function in `parking_lot`.
1011 #[inline]
unpark_filter( key: usize, mut filter: impl FnMut(ParkToken) -> FilterOp, callback: impl FnOnce(UnparkResult) -> UnparkToken, ) -> UnparkResult1012 pub unsafe fn unpark_filter(
1013 key: usize,
1014 mut filter: impl FnMut(ParkToken) -> FilterOp,
1015 callback: impl FnOnce(UnparkResult) -> UnparkToken,
1016 ) -> UnparkResult {
1017 // Lock the bucket for the given key
1018 let bucket = lock_bucket(key);
1019
1020 // Go through the queue looking for threads with a matching key
1021 let mut link = &bucket.queue_head;
1022 let mut current = bucket.queue_head.get();
1023 let mut previous = ptr::null();
1024 let mut threads = SmallVec::<[_; 8]>::new();
1025 let mut result = UnparkResult::default();
1026 while !current.is_null() {
1027 if (*current).key.load(Ordering::Relaxed) == key {
1028 // Call the filter function with the thread's ParkToken
1029 let next = (*current).next_in_queue.get();
1030 match filter((*current).park_token.get()) {
1031 FilterOp::Unpark => {
1032 // Remove the thread from the queue
1033 link.set(next);
1034 if bucket.queue_tail.get() == current {
1035 bucket.queue_tail.set(previous);
1036 }
1037
1038 // Add the thread to our list of threads to unpark
1039 threads.push((current, None));
1040
1041 current = next;
1042 }
1043 FilterOp::Skip => {
1044 result.have_more_threads = true;
1045 link = &(*current).next_in_queue;
1046 previous = current;
1047 current = link.get();
1048 }
1049 FilterOp::Stop => {
1050 result.have_more_threads = true;
1051 break;
1052 }
1053 }
1054 } else {
1055 link = &(*current).next_in_queue;
1056 previous = current;
1057 current = link.get();
1058 }
1059 }
1060
1061 // Invoke the callback before waking up the threads
1062 result.unparked_threads = threads.len();
1063 if result.unparked_threads != 0 {
1064 result.be_fair = (*bucket.fair_timeout.get()).should_timeout();
1065 }
1066 let token = callback(result);
1067
1068 // Pass the token to all threads that are going to be unparked and prepare
1069 // them for unparking.
1070 for t in threads.iter_mut() {
1071 (*t.0).unpark_token.set(token);
1072 t.1 = Some((*t.0).parker.unpark_lock());
1073 }
1074
1075 // SAFETY: We hold the lock here, as required
1076 bucket.mutex.unlock();
1077
1078 // Now that we are outside the lock, wake up all the threads that we removed
1079 // from the queue.
1080 for (_, handle) in threads.into_iter() {
1081 handle.unchecked_unwrap().unpark();
1082 }
1083
1084 result
1085 }
1086
1087 /// \[Experimental\] Deadlock detection
1088 ///
1089 /// Enabled via the `deadlock_detection` feature flag.
1090 pub mod deadlock {
1091 #[cfg(feature = "deadlock_detection")]
1092 use super::deadlock_impl;
1093
1094 #[cfg(feature = "deadlock_detection")]
1095 pub(super) use super::deadlock_impl::DeadlockData;
1096
1097 /// Acquire a resource identified by key in the deadlock detector
1098 /// Noop if deadlock_detection feature isn't enabled.
1099 ///
1100 /// # Safety
1101 ///
1102 /// Call after the resource is acquired
1103 #[inline]
acquire_resource(_key: usize)1104 pub unsafe fn acquire_resource(_key: usize) {
1105 #[cfg(feature = "deadlock_detection")]
1106 deadlock_impl::acquire_resource(_key);
1107 }
1108
1109 /// Release a resource identified by key in the deadlock detector.
1110 /// Noop if deadlock_detection feature isn't enabled.
1111 ///
1112 /// # Panics
1113 ///
1114 /// Panics if the resource was already released or wasn't acquired in this thread.
1115 ///
1116 /// # Safety
1117 ///
1118 /// Call before the resource is released
1119 #[inline]
release_resource(_key: usize)1120 pub unsafe fn release_resource(_key: usize) {
1121 #[cfg(feature = "deadlock_detection")]
1122 deadlock_impl::release_resource(_key);
1123 }
1124
1125 /// Returns all deadlocks detected *since* the last call.
1126 /// Each cycle consist of a vector of `DeadlockedThread`.
1127 #[cfg(feature = "deadlock_detection")]
1128 #[inline]
check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>>1129 pub fn check_deadlock() -> Vec<Vec<deadlock_impl::DeadlockedThread>> {
1130 deadlock_impl::check_deadlock()
1131 }
1132
1133 #[inline]
on_unpark(_td: &super::ThreadData)1134 pub(super) unsafe fn on_unpark(_td: &super::ThreadData) {
1135 #[cfg(feature = "deadlock_detection")]
1136 deadlock_impl::on_unpark(_td);
1137 }
1138 }
1139
1140 #[cfg(feature = "deadlock_detection")]
1141 mod deadlock_impl {
1142 use super::{get_hashtable, lock_bucket, with_thread_data, ThreadData, NUM_THREADS};
1143 use crate::thread_parker::{ThreadParkerT, UnparkHandleT};
1144 use crate::word_lock::WordLock;
1145 use backtrace::Backtrace;
1146 use petgraph;
1147 use petgraph::graphmap::DiGraphMap;
1148 use std::cell::{Cell, UnsafeCell};
1149 use std::collections::HashSet;
1150 use std::sync::atomic::Ordering;
1151 use std::sync::mpsc;
1152 use thread_id;
1153
1154 /// Representation of a deadlocked thread
1155 pub struct DeadlockedThread {
1156 thread_id: usize,
1157 backtrace: Backtrace,
1158 }
1159
1160 impl DeadlockedThread {
1161 /// The system thread id
thread_id(&self) -> usize1162 pub fn thread_id(&self) -> usize {
1163 self.thread_id
1164 }
1165
1166 /// The thread backtrace
backtrace(&self) -> &Backtrace1167 pub fn backtrace(&self) -> &Backtrace {
1168 &self.backtrace
1169 }
1170 }
1171
1172 pub struct DeadlockData {
1173 // Currently owned resources (keys)
1174 resources: UnsafeCell<Vec<usize>>,
1175
1176 // Set when there's a pending callstack request
1177 deadlocked: Cell<bool>,
1178
1179 // Sender used to report the backtrace
1180 backtrace_sender: UnsafeCell<Option<mpsc::Sender<DeadlockedThread>>>,
1181
1182 // System thread id
1183 thread_id: usize,
1184 }
1185
1186 impl DeadlockData {
new() -> Self1187 pub fn new() -> Self {
1188 DeadlockData {
1189 resources: UnsafeCell::new(Vec::new()),
1190 deadlocked: Cell::new(false),
1191 backtrace_sender: UnsafeCell::new(None),
1192 thread_id: thread_id::get(),
1193 }
1194 }
1195 }
1196
on_unpark(td: &ThreadData)1197 pub(super) unsafe fn on_unpark(td: &ThreadData) {
1198 if td.deadlock_data.deadlocked.get() {
1199 let sender = (*td.deadlock_data.backtrace_sender.get()).take().unwrap();
1200 sender
1201 .send(DeadlockedThread {
1202 thread_id: td.deadlock_data.thread_id,
1203 backtrace: Backtrace::new(),
1204 })
1205 .unwrap();
1206 // make sure to close this sender
1207 drop(sender);
1208
1209 // park until the end of the time
1210 td.parker.prepare_park();
1211 td.parker.park();
1212 unreachable!("unparked deadlocked thread!");
1213 }
1214 }
1215
acquire_resource(key: usize)1216 pub unsafe fn acquire_resource(key: usize) {
1217 with_thread_data(|thread_data| {
1218 (*thread_data.deadlock_data.resources.get()).push(key);
1219 });
1220 }
1221
release_resource(key: usize)1222 pub unsafe fn release_resource(key: usize) {
1223 with_thread_data(|thread_data| {
1224 let resources = &mut (*thread_data.deadlock_data.resources.get());
1225
1226 // There is only one situation where we can fail to find the
1227 // resource: we are currently running TLS destructors and our
1228 // ThreadData has already been freed. There isn't much we can do
1229 // about it at this point, so just ignore it.
1230 if let Some(p) = resources.iter().rposition(|x| *x == key) {
1231 resources.swap_remove(p);
1232 }
1233 });
1234 }
1235
check_deadlock() -> Vec<Vec<DeadlockedThread>>1236 pub fn check_deadlock() -> Vec<Vec<DeadlockedThread>> {
1237 unsafe {
1238 // fast pass
1239 if check_wait_graph_fast() {
1240 // double check
1241 check_wait_graph_slow()
1242 } else {
1243 Vec::new()
1244 }
1245 }
1246 }
1247
1248 // Simple algorithm that builds a wait graph f the threads and the resources,
1249 // then checks for the presence of cycles (deadlocks).
1250 // This variant isn't precise as it doesn't lock the entire table before checking
check_wait_graph_fast() -> bool1251 unsafe fn check_wait_graph_fast() -> bool {
1252 let table = get_hashtable();
1253 let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1254 let mut graph = DiGraphMap::<usize, ()>::with_capacity(thread_count * 2, thread_count * 2);
1255
1256 for b in &(*table).entries[..] {
1257 b.mutex.lock();
1258 let mut current = b.queue_head.get();
1259 while !current.is_null() {
1260 if !(*current).parked_with_timeout.get()
1261 && !(*current).deadlock_data.deadlocked.get()
1262 {
1263 // .resources are waiting for their owner
1264 for &resource in &(*(*current).deadlock_data.resources.get()) {
1265 graph.add_edge(resource, current as usize, ());
1266 }
1267 // owner waits for resource .key
1268 graph.add_edge(current as usize, (*current).key.load(Ordering::Relaxed), ());
1269 }
1270 current = (*current).next_in_queue.get();
1271 }
1272 // SAFETY: We hold the lock here, as required
1273 b.mutex.unlock();
1274 }
1275
1276 petgraph::algo::is_cyclic_directed(&graph)
1277 }
1278
1279 #[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Copy, Clone)]
1280 enum WaitGraphNode {
1281 Thread(*const ThreadData),
1282 Resource(usize),
1283 }
1284
1285 use self::WaitGraphNode::*;
1286
1287 // Contrary to the _fast variant this locks the entries table before looking for cycles.
1288 // Returns all detected thread wait cycles.
1289 // Note that once a cycle is reported it's never reported again.
check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>>1290 unsafe fn check_wait_graph_slow() -> Vec<Vec<DeadlockedThread>> {
1291 static DEADLOCK_DETECTION_LOCK: WordLock = WordLock::new();
1292 DEADLOCK_DETECTION_LOCK.lock();
1293
1294 let mut table = get_hashtable();
1295 loop {
1296 // Lock all buckets in the old table
1297 for b in &table.entries[..] {
1298 b.mutex.lock();
1299 }
1300
1301 // Now check if our table is still the latest one. Another thread could
1302 // have grown the hash table between us getting and locking the hash table.
1303 let new_table = get_hashtable();
1304 if new_table as *const _ == table as *const _ {
1305 break;
1306 }
1307
1308 // Unlock buckets and try again
1309 for b in &table.entries[..] {
1310 // SAFETY: We hold the lock here, as required
1311 b.mutex.unlock();
1312 }
1313
1314 table = new_table;
1315 }
1316
1317 let thread_count = NUM_THREADS.load(Ordering::Relaxed);
1318 let mut graph =
1319 DiGraphMap::<WaitGraphNode, ()>::with_capacity(thread_count * 2, thread_count * 2);
1320
1321 for b in &table.entries[..] {
1322 let mut current = b.queue_head.get();
1323 while !current.is_null() {
1324 if !(*current).parked_with_timeout.get()
1325 && !(*current).deadlock_data.deadlocked.get()
1326 {
1327 // .resources are waiting for their owner
1328 for &resource in &(*(*current).deadlock_data.resources.get()) {
1329 graph.add_edge(Resource(resource), Thread(current), ());
1330 }
1331 // owner waits for resource .key
1332 graph.add_edge(
1333 Thread(current),
1334 Resource((*current).key.load(Ordering::Relaxed)),
1335 (),
1336 );
1337 }
1338 current = (*current).next_in_queue.get();
1339 }
1340 }
1341
1342 for b in &table.entries[..] {
1343 // SAFETY: We hold the lock here, as required
1344 b.mutex.unlock();
1345 }
1346
1347 // find cycles
1348 let cycles = graph_cycles(&graph);
1349
1350 let mut results = Vec::with_capacity(cycles.len());
1351
1352 for cycle in cycles {
1353 let (sender, receiver) = mpsc::channel();
1354 for td in cycle {
1355 let bucket = lock_bucket((*td).key.load(Ordering::Relaxed));
1356 (*td).deadlock_data.deadlocked.set(true);
1357 *(*td).deadlock_data.backtrace_sender.get() = Some(sender.clone());
1358 let handle = (*td).parker.unpark_lock();
1359 // SAFETY: We hold the lock here, as required
1360 bucket.mutex.unlock();
1361 // unpark the deadlocked thread!
1362 // on unpark it'll notice the deadlocked flag and report back
1363 handle.unpark();
1364 }
1365 // make sure to drop our sender before collecting results
1366 drop(sender);
1367 results.push(receiver.iter().collect());
1368 }
1369
1370 DEADLOCK_DETECTION_LOCK.unlock();
1371
1372 results
1373 }
1374
1375 // normalize a cycle to start with the "smallest" node
normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T>1376 fn normalize_cycle<T: Ord + Copy + Clone>(input: &[T]) -> Vec<T> {
1377 let min_pos = input
1378 .iter()
1379 .enumerate()
1380 .min_by_key(|&(_, &t)| t)
1381 .map(|(p, _)| p)
1382 .unwrap_or(0);
1383 input
1384 .iter()
1385 .cycle()
1386 .skip(min_pos)
1387 .take(input.len())
1388 .cloned()
1389 .collect()
1390 }
1391
1392 // returns all thread cycles in the wait graph
graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>>1393 fn graph_cycles(g: &DiGraphMap<WaitGraphNode, ()>) -> Vec<Vec<*const ThreadData>> {
1394 use petgraph::visit::depth_first_search;
1395 use petgraph::visit::DfsEvent;
1396 use petgraph::visit::NodeIndexable;
1397
1398 let mut cycles = HashSet::new();
1399 let mut path = Vec::with_capacity(g.node_bound());
1400 // start from threads to get the correct threads cycle
1401 let threads = g
1402 .nodes()
1403 .filter(|n| if let &Thread(_) = n { true } else { false });
1404
1405 depth_first_search(g, threads, |e| match e {
1406 DfsEvent::Discover(Thread(n), _) => path.push(n),
1407 DfsEvent::Finish(Thread(_), _) => {
1408 path.pop();
1409 }
1410 DfsEvent::BackEdge(_, Thread(n)) => {
1411 let from = path.iter().rposition(|&i| i == n).unwrap();
1412 cycles.insert(normalize_cycle(&path[from..]));
1413 }
1414 _ => (),
1415 });
1416
1417 cycles.iter().cloned().collect()
1418 }
1419 }
1420
1421 #[cfg(test)]
1422 mod tests {
1423 use super::{ThreadData, DEFAULT_PARK_TOKEN, DEFAULT_UNPARK_TOKEN};
1424 use std::{
1425 ptr,
1426 sync::{
1427 atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
1428 Arc,
1429 },
1430 thread,
1431 time::Duration,
1432 };
1433
1434 /// Calls a closure for every `ThreadData` currently parked on a given key
for_each(key: usize, mut f: impl FnMut(&ThreadData))1435 fn for_each(key: usize, mut f: impl FnMut(&ThreadData)) {
1436 let bucket = super::lock_bucket(key);
1437
1438 let mut current: *const ThreadData = bucket.queue_head.get();
1439 while !current.is_null() {
1440 let current_ref = unsafe { &*current };
1441 if current_ref.key.load(Ordering::Relaxed) == key {
1442 f(current_ref);
1443 }
1444 current = current_ref.next_in_queue.get();
1445 }
1446
1447 // SAFETY: We hold the lock here, as required
1448 unsafe { bucket.mutex.unlock() };
1449 }
1450
1451 macro_rules! test {
1452 ( $( $name:ident(
1453 repeats: $repeats:expr,
1454 latches: $latches:expr,
1455 delay: $delay:expr,
1456 threads: $threads:expr,
1457 single_unparks: $single_unparks:expr);
1458 )* ) => {
1459 $(#[test]
1460 fn $name() {
1461 let delay = Duration::from_micros($delay);
1462 for _ in 0..$repeats {
1463 run_parking_test($latches, delay, $threads, $single_unparks);
1464 }
1465 })*
1466 };
1467 }
1468
1469 test! {
1470 unpark_all_one_fast(
1471 repeats: 10000, latches: 1, delay: 0, threads: 1, single_unparks: 0
1472 );
1473 unpark_all_hundred_fast(
1474 repeats: 100, latches: 1, delay: 0, threads: 100, single_unparks: 0
1475 );
1476 unpark_one_one_fast(
1477 repeats: 1000, latches: 1, delay: 0, threads: 1, single_unparks: 1
1478 );
1479 unpark_one_hundred_fast(
1480 repeats: 20, latches: 1, delay: 0, threads: 100, single_unparks: 100
1481 );
1482 unpark_one_fifty_then_fifty_all_fast(
1483 repeats: 50, latches: 1, delay: 0, threads: 100, single_unparks: 50
1484 );
1485 unpark_all_one(
1486 repeats: 100, latches: 1, delay: 10000, threads: 1, single_unparks: 0
1487 );
1488 unpark_all_hundred(
1489 repeats: 100, latches: 1, delay: 10000, threads: 100, single_unparks: 0
1490 );
1491 unpark_one_one(
1492 repeats: 10, latches: 1, delay: 10000, threads: 1, single_unparks: 1
1493 );
1494 unpark_one_fifty(
1495 repeats: 1, latches: 1, delay: 10000, threads: 50, single_unparks: 50
1496 );
1497 unpark_one_fifty_then_fifty_all(
1498 repeats: 2, latches: 1, delay: 10000, threads: 100, single_unparks: 50
1499 );
1500 hundred_unpark_all_one_fast(
1501 repeats: 100, latches: 100, delay: 0, threads: 1, single_unparks: 0
1502 );
1503 hundred_unpark_all_one(
1504 repeats: 1, latches: 100, delay: 10000, threads: 1, single_unparks: 0
1505 );
1506 }
1507
run_parking_test( num_latches: usize, delay: Duration, num_threads: usize, num_single_unparks: usize, )1508 fn run_parking_test(
1509 num_latches: usize,
1510 delay: Duration,
1511 num_threads: usize,
1512 num_single_unparks: usize,
1513 ) {
1514 let mut tests = Vec::with_capacity(num_latches);
1515
1516 for _ in 0..num_latches {
1517 let test = Arc::new(SingleLatchTest::new(num_threads));
1518 let mut threads = Vec::with_capacity(num_threads);
1519 for _ in 0..num_threads {
1520 let test = test.clone();
1521 threads.push(thread::spawn(move || test.run()));
1522 }
1523 tests.push((test, threads));
1524 }
1525
1526 for unpark_index in 0..num_single_unparks {
1527 thread::sleep(delay);
1528 for (test, _) in &tests {
1529 test.unpark_one(unpark_index);
1530 }
1531 }
1532
1533 for (test, threads) in tests {
1534 test.finish(num_single_unparks);
1535 for thread in threads {
1536 thread.join().expect("Test thread panic");
1537 }
1538 }
1539 }
1540
1541 struct SingleLatchTest {
1542 semaphore: AtomicIsize,
1543 num_awake: AtomicUsize,
1544 /// Holds the pointer to the last *unprocessed* woken up thread.
1545 last_awoken: AtomicPtr<ThreadData>,
1546 /// Total number of threads participating in this test.
1547 num_threads: usize,
1548 }
1549
1550 impl SingleLatchTest {
new(num_threads: usize) -> Self1551 pub fn new(num_threads: usize) -> Self {
1552 Self {
1553 // This implements a fair (FIFO) semaphore, and it starts out unavailable.
1554 semaphore: AtomicIsize::new(0),
1555 num_awake: AtomicUsize::new(0),
1556 last_awoken: AtomicPtr::new(ptr::null_mut()),
1557 num_threads,
1558 }
1559 }
1560
run(&self)1561 pub fn run(&self) {
1562 // Get one slot from the semaphore
1563 self.down();
1564
1565 // Report back to the test verification code that this thread woke up
1566 let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _);
1567 self.last_awoken.store(this_thread_ptr, Ordering::SeqCst);
1568 self.num_awake.fetch_add(1, Ordering::SeqCst);
1569 }
1570
unpark_one(&self, single_unpark_index: usize)1571 pub fn unpark_one(&self, single_unpark_index: usize) {
1572 // last_awoken should be null at all times except between self.up() and at the bottom
1573 // of this method where it's reset to null again
1574 assert!(self.last_awoken.load(Ordering::SeqCst).is_null());
1575
1576 let mut queue: Vec<*mut ThreadData> = Vec::with_capacity(self.num_threads);
1577 for_each(self.semaphore_addr(), |thread_data| {
1578 queue.push(thread_data as *const _ as *mut _);
1579 });
1580 assert!(queue.len() <= self.num_threads - single_unpark_index);
1581
1582 let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);
1583
1584 self.up();
1585
1586 // Wait for a parked thread to wake up and update num_awake + last_awoken.
1587 while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
1588 thread::yield_now();
1589 }
1590
1591 // At this point the other thread should have set last_awoken inside the run() method
1592 let last_awoken = self.last_awoken.load(Ordering::SeqCst);
1593 assert!(!last_awoken.is_null());
1594 if !queue.is_empty() && queue[0] != last_awoken {
1595 panic!(
1596 "Woke up wrong thread:\n\tqueue: {:?}\n\tlast awoken: {:?}",
1597 queue, last_awoken
1598 );
1599 }
1600 self.last_awoken.store(ptr::null_mut(), Ordering::SeqCst);
1601 }
1602
finish(&self, num_single_unparks: usize)1603 pub fn finish(&self, num_single_unparks: usize) {
1604 // The amount of threads not unparked via unpark_one
1605 let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap();
1606
1607 // Wake remaining threads up with unpark_all. Has to be in a loop, because there might
1608 // still be threads that has not yet parked.
1609 while num_threads_left > 0 {
1610 let mut num_waiting_on_address = 0;
1611 for_each(self.semaphore_addr(), |_thread_data| {
1612 num_waiting_on_address += 1;
1613 });
1614 assert!(num_waiting_on_address <= num_threads_left);
1615
1616 let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);
1617
1618 let num_unparked =
1619 unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) };
1620 assert!(num_unparked >= num_waiting_on_address);
1621 assert!(num_unparked <= num_threads_left);
1622
1623 // Wait for all unparked threads to wake up and update num_awake + last_awoken.
1624 while self.num_awake.load(Ordering::SeqCst)
1625 != num_awake_before_unpark + num_unparked
1626 {
1627 thread::yield_now()
1628 }
1629
1630 num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
1631 }
1632 // By now, all threads should have been woken up
1633 assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);
1634
1635 // Make sure no thread is parked on our semaphore address
1636 let mut num_waiting_on_address = 0;
1637 for_each(self.semaphore_addr(), |_thread_data| {
1638 num_waiting_on_address += 1;
1639 });
1640 assert_eq!(num_waiting_on_address, 0);
1641 }
1642
down(&self)1643 pub fn down(&self) {
1644 let old_semaphore_value = self.semaphore.fetch_sub(1, Ordering::SeqCst);
1645
1646 if old_semaphore_value > 0 {
1647 // We acquired the semaphore. Done.
1648 return;
1649 }
1650
1651 // We need to wait.
1652 let validate = || true;
1653 let before_sleep = || {};
1654 let timed_out = |_, _| {};
1655 unsafe {
1656 super::park(
1657 self.semaphore_addr(),
1658 validate,
1659 before_sleep,
1660 timed_out,
1661 DEFAULT_PARK_TOKEN,
1662 None,
1663 );
1664 }
1665 }
1666
up(&self)1667 pub fn up(&self) {
1668 let old_semaphore_value = self.semaphore.fetch_add(1, Ordering::SeqCst);
1669
1670 // Check if anyone was waiting on the semaphore. If they were, then pass ownership to them.
1671 if old_semaphore_value < 0 {
1672 // We need to continue until we have actually unparked someone. It might be that
1673 // the thread we want to pass ownership to has decremented the semaphore counter,
1674 // but not yet parked.
1675 loop {
1676 match unsafe {
1677 super::unpark_one(self.semaphore_addr(), |_| DEFAULT_UNPARK_TOKEN)
1678 .unparked_threads
1679 } {
1680 1 => break,
1681 0 => (),
1682 i => panic!("Should not wake up {} threads", i),
1683 }
1684 }
1685 }
1686 }
1687
semaphore_addr(&self) -> usize1688 fn semaphore_addr(&self) -> usize {
1689 &self.semaphore as *const _ as usize
1690 }
1691 }
1692 }
1693