• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2016 Amanieu d'Antras
2 //
3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5 // http://opensource.org/licenses/MIT>, at your option. This file may not be
6 // copied, modified, or distributed except according to those terms.
7 
8 use crate::elision::{have_elision, AtomicElisionExt};
9 use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL};
10 use crate::util;
11 use core::{
12     cell::Cell,
13     sync::atomic::{AtomicUsize, Ordering},
14 };
15 use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade};
16 use parking_lot_core::{
17     self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken,
18 };
19 use std::time::{Duration, Instant};
20 
21 // This reader-writer lock implementation is based on Boost's upgrade_mutex:
22 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432
23 //
24 // This implementation uses 2 wait queues, one at key [addr] and one at key
25 // [addr + 1]. The primary queue is used for all new waiting threads, and the
26 // secondary queue is used by the thread which has acquired WRITER_BIT but is
27 // waiting for the remaining readers to exit the lock.
28 //
29 // This implementation is fair between readers and writers since it uses the
30 // order in which threads first started queuing to alternate between read phases
31 // and write phases. In particular is it not vulnerable to write starvation
32 // since readers will block if there is a pending writer.
33 
34 // There is at least one thread in the main queue.
35 const PARKED_BIT: usize = 0b0001;
36 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set.
37 const WRITER_PARKED_BIT: usize = 0b0010;
38 // A reader is holding an upgradable lock. The reader count must be non-zero and
39 // WRITER_BIT must not be set.
40 const UPGRADABLE_BIT: usize = 0b0100;
41 // If the reader count is zero: a writer is currently holding an exclusive lock.
42 // Otherwise: a writer is waiting for the remaining readers to exit the lock.
43 const WRITER_BIT: usize = 0b1000;
44 // Mask of bits used to count readers.
45 const READERS_MASK: usize = !0b1111;
46 // Base unit for counting readers.
47 const ONE_READER: usize = 0b10000;
48 
49 // Token indicating what type of lock a queued thread is trying to acquire
50 const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER);
51 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT);
52 const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT);
53 
54 /// Raw reader-writer lock type backed by the parking lot.
55 pub struct RawRwLock {
56     state: AtomicUsize,
57 }
58 
59 unsafe impl lock_api::RawRwLock for RawRwLock {
60     const INIT: RawRwLock = RawRwLock {
61         state: AtomicUsize::new(0),
62     };
63 
64     type GuardMarker = crate::GuardMarker;
65 
66     #[inline]
lock_exclusive(&self)67     fn lock_exclusive(&self) {
68         if self
69             .state
70             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
71             .is_err()
72         {
73             let result = self.lock_exclusive_slow(None);
74             debug_assert!(result);
75         }
76         self.deadlock_acquire();
77     }
78 
79     #[inline]
try_lock_exclusive(&self) -> bool80     fn try_lock_exclusive(&self) -> bool {
81         if self
82             .state
83             .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
84             .is_ok()
85         {
86             self.deadlock_acquire();
87             true
88         } else {
89             false
90         }
91     }
92 
93     #[inline]
unlock_exclusive(&self)94     unsafe fn unlock_exclusive(&self) {
95         self.deadlock_release();
96         if self
97             .state
98             .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
99             .is_ok()
100         {
101             return;
102         }
103         self.unlock_exclusive_slow(false);
104     }
105 
106     #[inline]
lock_shared(&self)107     fn lock_shared(&self) {
108         if !self.try_lock_shared_fast(false) {
109             let result = self.lock_shared_slow(false, None);
110             debug_assert!(result);
111         }
112         self.deadlock_acquire();
113     }
114 
115     #[inline]
try_lock_shared(&self) -> bool116     fn try_lock_shared(&self) -> bool {
117         let result = if self.try_lock_shared_fast(false) {
118             true
119         } else {
120             self.try_lock_shared_slow(false)
121         };
122         if result {
123             self.deadlock_acquire();
124         }
125         result
126     }
127 
128     #[inline]
unlock_shared(&self)129     unsafe fn unlock_shared(&self) {
130         self.deadlock_release();
131         let state = if have_elision() {
132             self.state.elision_fetch_sub_release(ONE_READER)
133         } else {
134             self.state.fetch_sub(ONE_READER, Ordering::Release)
135         };
136         if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) {
137             self.unlock_shared_slow();
138         }
139     }
140 
141     #[inline]
is_locked(&self) -> bool142     fn is_locked(&self) -> bool {
143         let state = self.state.load(Ordering::Relaxed);
144         state & (WRITER_BIT | READERS_MASK) != 0
145     }
146 
147     #[inline]
is_locked_exclusive(&self) -> bool148     fn is_locked_exclusive(&self) -> bool {
149         let state = self.state.load(Ordering::Relaxed);
150         state & (WRITER_BIT) != 0
151     }
152 }
153 
154 unsafe impl lock_api::RawRwLockFair for RawRwLock {
155     #[inline]
unlock_shared_fair(&self)156     unsafe fn unlock_shared_fair(&self) {
157         // Shared unlocking is always fair in this implementation.
158         self.unlock_shared();
159     }
160 
161     #[inline]
unlock_exclusive_fair(&self)162     unsafe fn unlock_exclusive_fair(&self) {
163         self.deadlock_release();
164         if self
165             .state
166             .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed)
167             .is_ok()
168         {
169             return;
170         }
171         self.unlock_exclusive_slow(true);
172     }
173 
174     #[inline]
bump_shared(&self)175     unsafe fn bump_shared(&self) {
176         if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT)
177             == ONE_READER | WRITER_BIT
178         {
179             self.bump_shared_slow();
180         }
181     }
182 
183     #[inline]
bump_exclusive(&self)184     unsafe fn bump_exclusive(&self) {
185         if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 {
186             self.bump_exclusive_slow();
187         }
188     }
189 }
190 
191 unsafe impl lock_api::RawRwLockDowngrade for RawRwLock {
192     #[inline]
downgrade(&self)193     unsafe fn downgrade(&self) {
194         let state = self
195             .state
196             .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release);
197 
198         // Wake up parked shared and upgradable threads if there are any
199         if state & PARKED_BIT != 0 {
200             self.downgrade_slow();
201         }
202     }
203 }
204 
205 unsafe impl lock_api::RawRwLockTimed for RawRwLock {
206     type Duration = Duration;
207     type Instant = Instant;
208 
209     #[inline]
try_lock_shared_for(&self, timeout: Self::Duration) -> bool210     fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool {
211         let result = if self.try_lock_shared_fast(false) {
212             true
213         } else {
214             self.lock_shared_slow(false, util::to_deadline(timeout))
215         };
216         if result {
217             self.deadlock_acquire();
218         }
219         result
220     }
221 
222     #[inline]
try_lock_shared_until(&self, timeout: Self::Instant) -> bool223     fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool {
224         let result = if self.try_lock_shared_fast(false) {
225             true
226         } else {
227             self.lock_shared_slow(false, Some(timeout))
228         };
229         if result {
230             self.deadlock_acquire();
231         }
232         result
233     }
234 
235     #[inline]
try_lock_exclusive_for(&self, timeout: Duration) -> bool236     fn try_lock_exclusive_for(&self, timeout: Duration) -> bool {
237         let result = if self
238             .state
239             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
240             .is_ok()
241         {
242             true
243         } else {
244             self.lock_exclusive_slow(util::to_deadline(timeout))
245         };
246         if result {
247             self.deadlock_acquire();
248         }
249         result
250     }
251 
252     #[inline]
try_lock_exclusive_until(&self, timeout: Instant) -> bool253     fn try_lock_exclusive_until(&self, timeout: Instant) -> bool {
254         let result = if self
255             .state
256             .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed)
257             .is_ok()
258         {
259             true
260         } else {
261             self.lock_exclusive_slow(Some(timeout))
262         };
263         if result {
264             self.deadlock_acquire();
265         }
266         result
267     }
268 }
269 
270 unsafe impl lock_api::RawRwLockRecursive for RawRwLock {
271     #[inline]
lock_shared_recursive(&self)272     fn lock_shared_recursive(&self) {
273         if !self.try_lock_shared_fast(true) {
274             let result = self.lock_shared_slow(true, None);
275             debug_assert!(result);
276         }
277         self.deadlock_acquire();
278     }
279 
280     #[inline]
try_lock_shared_recursive(&self) -> bool281     fn try_lock_shared_recursive(&self) -> bool {
282         let result = if self.try_lock_shared_fast(true) {
283             true
284         } else {
285             self.try_lock_shared_slow(true)
286         };
287         if result {
288             self.deadlock_acquire();
289         }
290         result
291     }
292 }
293 
294 unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock {
295     #[inline]
try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool296     fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool {
297         let result = if self.try_lock_shared_fast(true) {
298             true
299         } else {
300             self.lock_shared_slow(true, util::to_deadline(timeout))
301         };
302         if result {
303             self.deadlock_acquire();
304         }
305         result
306     }
307 
308     #[inline]
try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool309     fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool {
310         let result = if self.try_lock_shared_fast(true) {
311             true
312         } else {
313             self.lock_shared_slow(true, Some(timeout))
314         };
315         if result {
316             self.deadlock_acquire();
317         }
318         result
319     }
320 }
321 
322 unsafe impl lock_api::RawRwLockUpgrade for RawRwLock {
323     #[inline]
lock_upgradable(&self)324     fn lock_upgradable(&self) {
325         if !self.try_lock_upgradable_fast() {
326             let result = self.lock_upgradable_slow(None);
327             debug_assert!(result);
328         }
329         self.deadlock_acquire();
330     }
331 
332     #[inline]
try_lock_upgradable(&self) -> bool333     fn try_lock_upgradable(&self) -> bool {
334         let result = if self.try_lock_upgradable_fast() {
335             true
336         } else {
337             self.try_lock_upgradable_slow()
338         };
339         if result {
340             self.deadlock_acquire();
341         }
342         result
343     }
344 
345     #[inline]
unlock_upgradable(&self)346     unsafe fn unlock_upgradable(&self) {
347         self.deadlock_release();
348         let state = self.state.load(Ordering::Relaxed);
349         #[allow(clippy::collapsible_if)]
350         if state & PARKED_BIT == 0 {
351             if self
352                 .state
353                 .compare_exchange_weak(
354                     state,
355                     state - (ONE_READER | UPGRADABLE_BIT),
356                     Ordering::Release,
357                     Ordering::Relaxed,
358                 )
359                 .is_ok()
360             {
361                 return;
362             }
363         }
364         self.unlock_upgradable_slow(false);
365     }
366 
367     #[inline]
upgrade(&self)368     unsafe fn upgrade(&self) {
369         let state = self.state.fetch_sub(
370             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
371             Ordering::Acquire,
372         );
373         if state & READERS_MASK != ONE_READER {
374             let result = self.upgrade_slow(None);
375             debug_assert!(result);
376         }
377     }
378 
379     #[inline]
try_upgrade(&self) -> bool380     unsafe fn try_upgrade(&self) -> bool {
381         if self
382             .state
383             .compare_exchange_weak(
384                 ONE_READER | UPGRADABLE_BIT,
385                 WRITER_BIT,
386                 Ordering::Acquire,
387                 Ordering::Relaxed,
388             )
389             .is_ok()
390         {
391             true
392         } else {
393             self.try_upgrade_slow()
394         }
395     }
396 }
397 
398 unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock {
399     #[inline]
unlock_upgradable_fair(&self)400     unsafe fn unlock_upgradable_fair(&self) {
401         self.deadlock_release();
402         let state = self.state.load(Ordering::Relaxed);
403         #[allow(clippy::collapsible_if)]
404         if state & PARKED_BIT == 0 {
405             if self
406                 .state
407                 .compare_exchange_weak(
408                     state,
409                     state - (ONE_READER | UPGRADABLE_BIT),
410                     Ordering::Release,
411                     Ordering::Relaxed,
412                 )
413                 .is_ok()
414             {
415                 return;
416             }
417         }
418         self.unlock_upgradable_slow(false);
419     }
420 
421     #[inline]
bump_upgradable(&self)422     unsafe fn bump_upgradable(&self) {
423         if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT {
424             self.bump_upgradable_slow();
425         }
426     }
427 }
428 
429 unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock {
430     #[inline]
downgrade_upgradable(&self)431     unsafe fn downgrade_upgradable(&self) {
432         let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed);
433 
434         // Wake up parked upgradable threads if there are any
435         if state & PARKED_BIT != 0 {
436             self.downgrade_slow();
437         }
438     }
439 
440     #[inline]
downgrade_to_upgradable(&self)441     unsafe fn downgrade_to_upgradable(&self) {
442         let state = self.state.fetch_add(
443             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
444             Ordering::Release,
445         );
446 
447         // Wake up parked shared threads if there are any
448         if state & PARKED_BIT != 0 {
449             self.downgrade_to_upgradable_slow();
450         }
451     }
452 }
453 
454 unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock {
455     #[inline]
try_lock_upgradable_until(&self, timeout: Instant) -> bool456     fn try_lock_upgradable_until(&self, timeout: Instant) -> bool {
457         let result = if self.try_lock_upgradable_fast() {
458             true
459         } else {
460             self.lock_upgradable_slow(Some(timeout))
461         };
462         if result {
463             self.deadlock_acquire();
464         }
465         result
466     }
467 
468     #[inline]
try_lock_upgradable_for(&self, timeout: Duration) -> bool469     fn try_lock_upgradable_for(&self, timeout: Duration) -> bool {
470         let result = if self.try_lock_upgradable_fast() {
471             true
472         } else {
473             self.lock_upgradable_slow(util::to_deadline(timeout))
474         };
475         if result {
476             self.deadlock_acquire();
477         }
478         result
479     }
480 
481     #[inline]
try_upgrade_until(&self, timeout: Instant) -> bool482     unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool {
483         let state = self.state.fetch_sub(
484             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
485             Ordering::Relaxed,
486         );
487         if state & READERS_MASK == ONE_READER {
488             true
489         } else {
490             self.upgrade_slow(Some(timeout))
491         }
492     }
493 
494     #[inline]
try_upgrade_for(&self, timeout: Duration) -> bool495     unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool {
496         let state = self.state.fetch_sub(
497             (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT,
498             Ordering::Relaxed,
499         );
500         if state & READERS_MASK == ONE_READER {
501             true
502         } else {
503             self.upgrade_slow(util::to_deadline(timeout))
504         }
505     }
506 }
507 
508 impl RawRwLock {
509     #[inline(always)]
try_lock_shared_fast(&self, recursive: bool) -> bool510     fn try_lock_shared_fast(&self, recursive: bool) -> bool {
511         let state = self.state.load(Ordering::Relaxed);
512 
513         // We can't allow grabbing a shared lock if there is a writer, even if
514         // the writer is still waiting for the remaining readers to exit.
515         if state & WRITER_BIT != 0 {
516             // To allow recursive locks, we make an exception and allow readers
517             // to skip ahead of a pending writer to avoid deadlocking, at the
518             // cost of breaking the fairness guarantees.
519             if !recursive || state & READERS_MASK == 0 {
520                 return false;
521             }
522         }
523 
524         // Use hardware lock elision to avoid cache conflicts when multiple
525         // readers try to acquire the lock. We only do this if the lock is
526         // completely empty since elision handles conflicts poorly.
527         if have_elision() && state == 0 {
528             self.state
529                 .elision_compare_exchange_acquire(0, ONE_READER)
530                 .is_ok()
531         } else if let Some(new_state) = state.checked_add(ONE_READER) {
532             self.state
533                 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
534                 .is_ok()
535         } else {
536             false
537         }
538     }
539 
540     #[cold]
try_lock_shared_slow(&self, recursive: bool) -> bool541     fn try_lock_shared_slow(&self, recursive: bool) -> bool {
542         let mut state = self.state.load(Ordering::Relaxed);
543         loop {
544             // This mirrors the condition in try_lock_shared_fast
545             #[allow(clippy::collapsible_if)]
546             if state & WRITER_BIT != 0 {
547                 if !recursive || state & READERS_MASK == 0 {
548                     return false;
549                 }
550             }
551             if have_elision() && state == 0 {
552                 match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
553                     Ok(_) => return true,
554                     Err(x) => state = x,
555                 }
556             } else {
557                 match self.state.compare_exchange_weak(
558                     state,
559                     state
560                         .checked_add(ONE_READER)
561                         .expect("RwLock reader count overflow"),
562                     Ordering::Acquire,
563                     Ordering::Relaxed,
564                 ) {
565                     Ok(_) => return true,
566                     Err(x) => state = x,
567                 }
568             }
569         }
570     }
571 
572     #[inline(always)]
try_lock_upgradable_fast(&self) -> bool573     fn try_lock_upgradable_fast(&self) -> bool {
574         let state = self.state.load(Ordering::Relaxed);
575 
576         // We can't grab an upgradable lock if there is already a writer or
577         // upgradable reader.
578         if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
579             return false;
580         }
581 
582         if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) {
583             self.state
584                 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed)
585                 .is_ok()
586         } else {
587             false
588         }
589     }
590 
591     #[cold]
try_lock_upgradable_slow(&self) -> bool592     fn try_lock_upgradable_slow(&self) -> bool {
593         let mut state = self.state.load(Ordering::Relaxed);
594         loop {
595             // This mirrors the condition in try_lock_upgradable_fast
596             if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
597                 return false;
598             }
599 
600             match self.state.compare_exchange_weak(
601                 state,
602                 state
603                     .checked_add(ONE_READER | UPGRADABLE_BIT)
604                     .expect("RwLock reader count overflow"),
605                 Ordering::Acquire,
606                 Ordering::Relaxed,
607             ) {
608                 Ok(_) => return true,
609                 Err(x) => state = x,
610             }
611         }
612     }
613 
614     #[cold]
lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool615     fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool {
616         let try_lock = |state: &mut usize| {
617             loop {
618                 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
619                     return false;
620                 }
621 
622                 // Grab WRITER_BIT if it isn't set, even if there are parked threads.
623                 match self.state.compare_exchange_weak(
624                     *state,
625                     *state | WRITER_BIT,
626                     Ordering::Acquire,
627                     Ordering::Relaxed,
628                 ) {
629                     Ok(_) => return true,
630                     Err(x) => *state = x,
631                 }
632             }
633         };
634 
635         // Step 1: grab exclusive ownership of WRITER_BIT
636         let timed_out = !self.lock_common(
637             timeout,
638             TOKEN_EXCLUSIVE,
639             try_lock,
640             WRITER_BIT | UPGRADABLE_BIT,
641         );
642         if timed_out {
643             return false;
644         }
645 
646         // Step 2: wait for all remaining readers to exit the lock.
647         self.wait_for_readers(timeout, 0)
648     }
649 
650     #[cold]
unlock_exclusive_slow(&self, force_fair: bool)651     fn unlock_exclusive_slow(&self, force_fair: bool) {
652         // There are threads to unpark. Try to unpark as many as we can.
653         let callback = |mut new_state, result: UnparkResult| {
654             // If we are using a fair unlock then we should keep the
655             // rwlock locked and hand it off to the unparked threads.
656             if result.unparked_threads != 0 && (force_fair || result.be_fair) {
657                 if result.have_more_threads {
658                     new_state |= PARKED_BIT;
659                 }
660                 self.state.store(new_state, Ordering::Release);
661                 TOKEN_HANDOFF
662             } else {
663                 // Clear the parked bit if there are no more parked threads.
664                 if result.have_more_threads {
665                     self.state.store(PARKED_BIT, Ordering::Release);
666                 } else {
667                     self.state.store(0, Ordering::Release);
668                 }
669                 TOKEN_NORMAL
670             }
671         };
672         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
673         unsafe {
674             self.wake_parked_threads(0, callback);
675         }
676     }
677 
678     #[cold]
lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool679     fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool {
680         let try_lock = |state: &mut usize| {
681             let mut spinwait_shared = SpinWait::new();
682             loop {
683                 // Use hardware lock elision to avoid cache conflicts when multiple
684                 // readers try to acquire the lock. We only do this if the lock is
685                 // completely empty since elision handles conflicts poorly.
686                 if have_elision() && *state == 0 {
687                     match self.state.elision_compare_exchange_acquire(0, ONE_READER) {
688                         Ok(_) => return true,
689                         Err(x) => *state = x,
690                     }
691                 }
692 
693                 // This is the same condition as try_lock_shared_fast
694                 #[allow(clippy::collapsible_if)]
695                 if *state & WRITER_BIT != 0 {
696                     if !recursive || *state & READERS_MASK == 0 {
697                         return false;
698                     }
699                 }
700 
701                 if self
702                     .state
703                     .compare_exchange_weak(
704                         *state,
705                         state
706                             .checked_add(ONE_READER)
707                             .expect("RwLock reader count overflow"),
708                         Ordering::Acquire,
709                         Ordering::Relaxed,
710                     )
711                     .is_ok()
712                 {
713                     return true;
714                 }
715 
716                 // If there is high contention on the reader count then we want
717                 // to leave some time between attempts to acquire the lock to
718                 // let other threads make progress.
719                 spinwait_shared.spin_no_yield();
720                 *state = self.state.load(Ordering::Relaxed);
721             }
722         };
723         self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT)
724     }
725 
726     #[cold]
unlock_shared_slow(&self)727     fn unlock_shared_slow(&self) {
728         // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We
729         // just need to wake up a potentially sleeping pending writer.
730         // Using the 2nd key at addr + 1
731         let addr = self as *const _ as usize + 1;
732         let callback = |_result: UnparkResult| {
733             // Clear the WRITER_PARKED_BIT here since there can only be one
734             // parked writer thread.
735             self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed);
736             TOKEN_NORMAL
737         };
738         // SAFETY:
739         //   * `addr` is an address we control.
740         //   * `callback` does not panic or call into any function of `parking_lot`.
741         unsafe {
742             parking_lot_core::unpark_one(addr, callback);
743         }
744     }
745 
746     #[cold]
lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool747     fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool {
748         let try_lock = |state: &mut usize| {
749             let mut spinwait_shared = SpinWait::new();
750             loop {
751                 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 {
752                     return false;
753                 }
754 
755                 if self
756                     .state
757                     .compare_exchange_weak(
758                         *state,
759                         state
760                             .checked_add(ONE_READER | UPGRADABLE_BIT)
761                             .expect("RwLock reader count overflow"),
762                         Ordering::Acquire,
763                         Ordering::Relaxed,
764                     )
765                     .is_ok()
766                 {
767                     return true;
768                 }
769 
770                 // If there is high contention on the reader count then we want
771                 // to leave some time between attempts to acquire the lock to
772                 // let other threads make progress.
773                 spinwait_shared.spin_no_yield();
774                 *state = self.state.load(Ordering::Relaxed);
775             }
776         };
777         self.lock_common(
778             timeout,
779             TOKEN_UPGRADABLE,
780             try_lock,
781             WRITER_BIT | UPGRADABLE_BIT,
782         )
783     }
784 
785     #[cold]
unlock_upgradable_slow(&self, force_fair: bool)786     fn unlock_upgradable_slow(&self, force_fair: bool) {
787         // Just release the lock if there are no parked threads.
788         let mut state = self.state.load(Ordering::Relaxed);
789         while state & PARKED_BIT == 0 {
790             match self.state.compare_exchange_weak(
791                 state,
792                 state - (ONE_READER | UPGRADABLE_BIT),
793                 Ordering::Release,
794                 Ordering::Relaxed,
795             ) {
796                 Ok(_) => return,
797                 Err(x) => state = x,
798             }
799         }
800 
801         // There are threads to unpark. Try to unpark as many as we can.
802         let callback = |new_state, result: UnparkResult| {
803             // If we are using a fair unlock then we should keep the
804             // rwlock locked and hand it off to the unparked threads.
805             let mut state = self.state.load(Ordering::Relaxed);
806             if force_fair || result.be_fair {
807                 // Fall back to normal unpark on overflow. Panicking is
808                 // not allowed in parking_lot callbacks.
809                 while let Some(mut new_state) =
810                     (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state)
811                 {
812                     if result.have_more_threads {
813                         new_state |= PARKED_BIT;
814                     } else {
815                         new_state &= !PARKED_BIT;
816                     }
817                     match self.state.compare_exchange_weak(
818                         state,
819                         new_state,
820                         Ordering::Relaxed,
821                         Ordering::Relaxed,
822                     ) {
823                         Ok(_) => return TOKEN_HANDOFF,
824                         Err(x) => state = x,
825                     }
826                 }
827             }
828 
829             // Otherwise just release the upgradable lock and update PARKED_BIT.
830             loop {
831                 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT);
832                 if result.have_more_threads {
833                     new_state |= PARKED_BIT;
834                 } else {
835                     new_state &= !PARKED_BIT;
836                 }
837                 match self.state.compare_exchange_weak(
838                     state,
839                     new_state,
840                     Ordering::Relaxed,
841                     Ordering::Relaxed,
842                 ) {
843                     Ok(_) => return TOKEN_NORMAL,
844                     Err(x) => state = x,
845                 }
846             }
847         };
848         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
849         unsafe {
850             self.wake_parked_threads(0, callback);
851         }
852     }
853 
854     #[cold]
try_upgrade_slow(&self) -> bool855     fn try_upgrade_slow(&self) -> bool {
856         let mut state = self.state.load(Ordering::Relaxed);
857         loop {
858             if state & READERS_MASK != ONE_READER {
859                 return false;
860             }
861             match self.state.compare_exchange_weak(
862                 state,
863                 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT,
864                 Ordering::Relaxed,
865                 Ordering::Relaxed,
866             ) {
867                 Ok(_) => return true,
868                 Err(x) => state = x,
869             }
870         }
871     }
872 
873     #[cold]
upgrade_slow(&self, timeout: Option<Instant>) -> bool874     fn upgrade_slow(&self, timeout: Option<Instant>) -> bool {
875         self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT)
876     }
877 
878     #[cold]
downgrade_slow(&self)879     fn downgrade_slow(&self) {
880         // We only reach this point if PARKED_BIT is set.
881         let callback = |_, result: UnparkResult| {
882             // Clear the parked bit if there no more parked threads
883             if !result.have_more_threads {
884                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
885             }
886             TOKEN_NORMAL
887         };
888         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
889         unsafe {
890             self.wake_parked_threads(ONE_READER, callback);
891         }
892     }
893 
894     #[cold]
downgrade_to_upgradable_slow(&self)895     fn downgrade_to_upgradable_slow(&self) {
896         // We only reach this point if PARKED_BIT is set.
897         let callback = |_, result: UnparkResult| {
898             // Clear the parked bit if there no more parked threads
899             if !result.have_more_threads {
900                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
901             }
902             TOKEN_NORMAL
903         };
904         // SAFETY: `callback` does not panic or call into any function of `parking_lot`.
905         unsafe {
906             self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
907         }
908     }
909 
910     #[cold]
bump_shared_slow(&self)911     unsafe fn bump_shared_slow(&self) {
912         self.unlock_shared();
913         self.lock_shared();
914     }
915 
916     #[cold]
bump_exclusive_slow(&self)917     fn bump_exclusive_slow(&self) {
918         self.deadlock_release();
919         self.unlock_exclusive_slow(true);
920         self.lock_exclusive();
921     }
922 
923     #[cold]
bump_upgradable_slow(&self)924     fn bump_upgradable_slow(&self) {
925         self.deadlock_release();
926         self.unlock_upgradable_slow(true);
927         self.lock_upgradable();
928     }
929 
930     /// Common code for waking up parked threads after releasing `WRITER_BIT` or
931     /// `UPGRADABLE_BIT`.
932     ///
933     /// # Safety
934     ///
935     /// `callback` must uphold the requirements of the `callback` parameter to
936     /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in
937     /// `parking_lot`.
938     #[inline]
wake_parked_threads( &self, new_state: usize, callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, )939     unsafe fn wake_parked_threads(
940         &self,
941         new_state: usize,
942         callback: impl FnOnce(usize, UnparkResult) -> UnparkToken,
943     ) {
944         // We must wake up at least one upgrader or writer if there is one,
945         // otherwise they may end up parked indefinitely since unlock_shared
946         // does not call wake_parked_threads.
947         let new_state = Cell::new(new_state);
948         let addr = self as *const _ as usize;
949         let filter = |ParkToken(token)| {
950             let s = new_state.get();
951 
952             // If we are waking up a writer, don't wake anything else.
953             if s & WRITER_BIT != 0 {
954                 return FilterOp::Stop;
955             }
956 
957             // Otherwise wake *all* readers and one upgrader/writer.
958             if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 {
959                 // Skip writers and upgradable readers if we already have
960                 // a writer/upgradable reader.
961                 FilterOp::Skip
962             } else {
963                 new_state.set(s + token);
964                 FilterOp::Unpark
965             }
966         };
967         let callback = |result| callback(new_state.get(), result);
968         // SAFETY:
969         // * `addr` is an address we control.
970         // * `filter` does not panic or call into any function of `parking_lot`.
971         // * `callback` safety responsibility is on caller
972         parking_lot_core::unpark_filter(addr, filter, callback);
973     }
974 
975     // Common code for waiting for readers to exit the lock after acquiring
976     // WRITER_BIT.
977     #[inline]
wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool978     fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool {
979         // At this point WRITER_BIT is already set, we just need to wait for the
980         // remaining readers to exit the lock.
981         let mut spinwait = SpinWait::new();
982         let mut state = self.state.load(Ordering::Acquire);
983         while state & READERS_MASK != 0 {
984             // Spin a few times to wait for readers to exit
985             if spinwait.spin() {
986                 state = self.state.load(Ordering::Acquire);
987                 continue;
988             }
989 
990             // Set the parked bit
991             if state & WRITER_PARKED_BIT == 0 {
992                 if let Err(x) = self.state.compare_exchange_weak(
993                     state,
994                     state | WRITER_PARKED_BIT,
995                     Ordering::Acquire,
996                     Ordering::Acquire,
997                 ) {
998                     state = x;
999                     continue;
1000                 }
1001             }
1002 
1003             // Park our thread until we are woken up by an unlock
1004             // Using the 2nd key at addr + 1
1005             let addr = self as *const _ as usize + 1;
1006             let validate = || {
1007                 let state = self.state.load(Ordering::Relaxed);
1008                 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0
1009             };
1010             let before_sleep = || {};
1011             let timed_out = |_, _| {};
1012             // SAFETY:
1013             //   * `addr` is an address we control.
1014             //   * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1015             //   * `before_sleep` does not call `park`, nor does it panic.
1016             let park_result = unsafe {
1017                 parking_lot_core::park(
1018                     addr,
1019                     validate,
1020                     before_sleep,
1021                     timed_out,
1022                     TOKEN_EXCLUSIVE,
1023                     timeout,
1024                 )
1025             };
1026             match park_result {
1027                 // We still need to re-check the state if we are unparked
1028                 // since a previous writer timing-out could have allowed
1029                 // another reader to sneak in before we parked.
1030                 ParkResult::Unparked(_) | ParkResult::Invalid => {
1031                     state = self.state.load(Ordering::Acquire);
1032                     continue;
1033                 }
1034 
1035                 // Timeout expired
1036                 ParkResult::TimedOut => {
1037                     // We need to release WRITER_BIT and revert back to
1038                     // our previous value. We also wake up any threads that
1039                     // might be waiting on WRITER_BIT.
1040                     let state = self.state.fetch_add(
1041                         prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT),
1042                         Ordering::Relaxed,
1043                     );
1044                     if state & PARKED_BIT != 0 {
1045                         let callback = |_, result: UnparkResult| {
1046                             // Clear the parked bit if there no more parked threads
1047                             if !result.have_more_threads {
1048                                 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1049                             }
1050                             TOKEN_NORMAL
1051                         };
1052                         // SAFETY: `callback` does not panic or call any function of `parking_lot`.
1053                         unsafe {
1054                             self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback);
1055                         }
1056                     }
1057                     return false;
1058                 }
1059             }
1060         }
1061         true
1062     }
1063 
1064     /// Common code for acquiring a lock
1065     #[inline]
lock_common( &self, timeout: Option<Instant>, token: ParkToken, mut try_lock: impl FnMut(&mut usize) -> bool, validate_flags: usize, ) -> bool1066     fn lock_common(
1067         &self,
1068         timeout: Option<Instant>,
1069         token: ParkToken,
1070         mut try_lock: impl FnMut(&mut usize) -> bool,
1071         validate_flags: usize,
1072     ) -> bool {
1073         let mut spinwait = SpinWait::new();
1074         let mut state = self.state.load(Ordering::Relaxed);
1075         loop {
1076             // Attempt to grab the lock
1077             if try_lock(&mut state) {
1078                 return true;
1079             }
1080 
1081             // If there are no parked threads, try spinning a few times.
1082             if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() {
1083                 state = self.state.load(Ordering::Relaxed);
1084                 continue;
1085             }
1086 
1087             // Set the parked bit
1088             if state & PARKED_BIT == 0 {
1089                 if let Err(x) = self.state.compare_exchange_weak(
1090                     state,
1091                     state | PARKED_BIT,
1092                     Ordering::Relaxed,
1093                     Ordering::Relaxed,
1094                 ) {
1095                     state = x;
1096                     continue;
1097                 }
1098             }
1099 
1100             // Park our thread until we are woken up by an unlock
1101             let addr = self as *const _ as usize;
1102             let validate = || {
1103                 let state = self.state.load(Ordering::Relaxed);
1104                 state & PARKED_BIT != 0 && (state & validate_flags != 0)
1105             };
1106             let before_sleep = || {};
1107             let timed_out = |_, was_last_thread| {
1108                 // Clear the parked bit if we were the last parked thread
1109                 if was_last_thread {
1110                     self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed);
1111                 }
1112             };
1113 
1114             // SAFETY:
1115             // * `addr` is an address we control.
1116             // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`.
1117             // * `before_sleep` does not call `park`, nor does it panic.
1118             let park_result = unsafe {
1119                 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout)
1120             };
1121             match park_result {
1122                 // The thread that unparked us passed the lock on to us
1123                 // directly without unlocking it.
1124                 ParkResult::Unparked(TOKEN_HANDOFF) => return true,
1125 
1126                 // We were unparked normally, try acquiring the lock again
1127                 ParkResult::Unparked(_) => (),
1128 
1129                 // The validation function failed, try locking again
1130                 ParkResult::Invalid => (),
1131 
1132                 // Timeout expired
1133                 ParkResult::TimedOut => return false,
1134             }
1135 
1136             // Loop back and try locking again
1137             spinwait.reset();
1138             state = self.state.load(Ordering::Relaxed);
1139         }
1140     }
1141 
1142     #[inline]
deadlock_acquire(&self)1143     fn deadlock_acquire(&self) {
1144         unsafe { deadlock::acquire_resource(self as *const _ as usize) };
1145         unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) };
1146     }
1147 
1148     #[inline]
deadlock_release(&self)1149     fn deadlock_release(&self) {
1150         unsafe { deadlock::release_resource(self as *const _ as usize) };
1151         unsafe { deadlock::release_resource(self as *const _ as usize + 1) };
1152     }
1153 }
1154