1 // Copyright 2016 Amanieu d'Antras 2 // 3 // Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or 4 // http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or 5 // http://opensource.org/licenses/MIT>, at your option. This file may not be 6 // copied, modified, or distributed except according to those terms. 7 8 use crate::elision::{have_elision, AtomicElisionExt}; 9 use crate::raw_mutex::{TOKEN_HANDOFF, TOKEN_NORMAL}; 10 use crate::util; 11 use core::{ 12 cell::Cell, 13 sync::atomic::{AtomicUsize, Ordering}, 14 }; 15 use lock_api::{RawRwLock as RawRwLock_, RawRwLockUpgrade}; 16 use parking_lot_core::{ 17 self, deadlock, FilterOp, ParkResult, ParkToken, SpinWait, UnparkResult, UnparkToken, 18 }; 19 use std::time::{Duration, Instant}; 20 21 // This reader-writer lock implementation is based on Boost's upgrade_mutex: 22 // https://github.com/boostorg/thread/blob/fc08c1fe2840baeeee143440fba31ef9e9a813c8/include/boost/thread/v2/shared_mutex.hpp#L432 23 // 24 // This implementation uses 2 wait queues, one at key [addr] and one at key 25 // [addr + 1]. The primary queue is used for all new waiting threads, and the 26 // secondary queue is used by the thread which has acquired WRITER_BIT but is 27 // waiting for the remaining readers to exit the lock. 28 // 29 // This implementation is fair between readers and writers since it uses the 30 // order in which threads first started queuing to alternate between read phases 31 // and write phases. In particular is it not vulnerable to write starvation 32 // since readers will block if there is a pending writer. 33 34 // There is at least one thread in the main queue. 35 const PARKED_BIT: usize = 0b0001; 36 // There is a parked thread holding WRITER_BIT. WRITER_BIT must be set. 37 const WRITER_PARKED_BIT: usize = 0b0010; 38 // A reader is holding an upgradable lock. The reader count must be non-zero and 39 // WRITER_BIT must not be set. 40 const UPGRADABLE_BIT: usize = 0b0100; 41 // If the reader count is zero: a writer is currently holding an exclusive lock. 42 // Otherwise: a writer is waiting for the remaining readers to exit the lock. 43 const WRITER_BIT: usize = 0b1000; 44 // Mask of bits used to count readers. 45 const READERS_MASK: usize = !0b1111; 46 // Base unit for counting readers. 47 const ONE_READER: usize = 0b10000; 48 49 // Token indicating what type of lock a queued thread is trying to acquire 50 const TOKEN_SHARED: ParkToken = ParkToken(ONE_READER); 51 const TOKEN_EXCLUSIVE: ParkToken = ParkToken(WRITER_BIT); 52 const TOKEN_UPGRADABLE: ParkToken = ParkToken(ONE_READER | UPGRADABLE_BIT); 53 54 /// Raw reader-writer lock type backed by the parking lot. 55 pub struct RawRwLock { 56 state: AtomicUsize, 57 } 58 59 unsafe impl lock_api::RawRwLock for RawRwLock { 60 const INIT: RawRwLock = RawRwLock { 61 state: AtomicUsize::new(0), 62 }; 63 64 type GuardMarker = crate::GuardMarker; 65 66 #[inline] lock_exclusive(&self)67 fn lock_exclusive(&self) { 68 if self 69 .state 70 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 71 .is_err() 72 { 73 let result = self.lock_exclusive_slow(None); 74 debug_assert!(result); 75 } 76 self.deadlock_acquire(); 77 } 78 79 #[inline] try_lock_exclusive(&self) -> bool80 fn try_lock_exclusive(&self) -> bool { 81 if self 82 .state 83 .compare_exchange(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 84 .is_ok() 85 { 86 self.deadlock_acquire(); 87 true 88 } else { 89 false 90 } 91 } 92 93 #[inline] unlock_exclusive(&self)94 unsafe fn unlock_exclusive(&self) { 95 self.deadlock_release(); 96 if self 97 .state 98 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) 99 .is_ok() 100 { 101 return; 102 } 103 self.unlock_exclusive_slow(false); 104 } 105 106 #[inline] lock_shared(&self)107 fn lock_shared(&self) { 108 if !self.try_lock_shared_fast(false) { 109 let result = self.lock_shared_slow(false, None); 110 debug_assert!(result); 111 } 112 self.deadlock_acquire(); 113 } 114 115 #[inline] try_lock_shared(&self) -> bool116 fn try_lock_shared(&self) -> bool { 117 let result = if self.try_lock_shared_fast(false) { 118 true 119 } else { 120 self.try_lock_shared_slow(false) 121 }; 122 if result { 123 self.deadlock_acquire(); 124 } 125 result 126 } 127 128 #[inline] unlock_shared(&self)129 unsafe fn unlock_shared(&self) { 130 self.deadlock_release(); 131 let state = if have_elision() { 132 self.state.elision_fetch_sub_release(ONE_READER) 133 } else { 134 self.state.fetch_sub(ONE_READER, Ordering::Release) 135 }; 136 if state & (READERS_MASK | WRITER_PARKED_BIT) == (ONE_READER | WRITER_PARKED_BIT) { 137 self.unlock_shared_slow(); 138 } 139 } 140 141 #[inline] is_locked(&self) -> bool142 fn is_locked(&self) -> bool { 143 let state = self.state.load(Ordering::Relaxed); 144 state & (WRITER_BIT | READERS_MASK) != 0 145 } 146 147 #[inline] is_locked_exclusive(&self) -> bool148 fn is_locked_exclusive(&self) -> bool { 149 let state = self.state.load(Ordering::Relaxed); 150 state & (WRITER_BIT) != 0 151 } 152 } 153 154 unsafe impl lock_api::RawRwLockFair for RawRwLock { 155 #[inline] unlock_shared_fair(&self)156 unsafe fn unlock_shared_fair(&self) { 157 // Shared unlocking is always fair in this implementation. 158 self.unlock_shared(); 159 } 160 161 #[inline] unlock_exclusive_fair(&self)162 unsafe fn unlock_exclusive_fair(&self) { 163 self.deadlock_release(); 164 if self 165 .state 166 .compare_exchange(WRITER_BIT, 0, Ordering::Release, Ordering::Relaxed) 167 .is_ok() 168 { 169 return; 170 } 171 self.unlock_exclusive_slow(true); 172 } 173 174 #[inline] bump_shared(&self)175 unsafe fn bump_shared(&self) { 176 if self.state.load(Ordering::Relaxed) & (READERS_MASK | WRITER_BIT) 177 == ONE_READER | WRITER_BIT 178 { 179 self.bump_shared_slow(); 180 } 181 } 182 183 #[inline] bump_exclusive(&self)184 unsafe fn bump_exclusive(&self) { 185 if self.state.load(Ordering::Relaxed) & PARKED_BIT != 0 { 186 self.bump_exclusive_slow(); 187 } 188 } 189 } 190 191 unsafe impl lock_api::RawRwLockDowngrade for RawRwLock { 192 #[inline] downgrade(&self)193 unsafe fn downgrade(&self) { 194 let state = self 195 .state 196 .fetch_add(ONE_READER - WRITER_BIT, Ordering::Release); 197 198 // Wake up parked shared and upgradable threads if there are any 199 if state & PARKED_BIT != 0 { 200 self.downgrade_slow(); 201 } 202 } 203 } 204 205 unsafe impl lock_api::RawRwLockTimed for RawRwLock { 206 type Duration = Duration; 207 type Instant = Instant; 208 209 #[inline] try_lock_shared_for(&self, timeout: Self::Duration) -> bool210 fn try_lock_shared_for(&self, timeout: Self::Duration) -> bool { 211 let result = if self.try_lock_shared_fast(false) { 212 true 213 } else { 214 self.lock_shared_slow(false, util::to_deadline(timeout)) 215 }; 216 if result { 217 self.deadlock_acquire(); 218 } 219 result 220 } 221 222 #[inline] try_lock_shared_until(&self, timeout: Self::Instant) -> bool223 fn try_lock_shared_until(&self, timeout: Self::Instant) -> bool { 224 let result = if self.try_lock_shared_fast(false) { 225 true 226 } else { 227 self.lock_shared_slow(false, Some(timeout)) 228 }; 229 if result { 230 self.deadlock_acquire(); 231 } 232 result 233 } 234 235 #[inline] try_lock_exclusive_for(&self, timeout: Duration) -> bool236 fn try_lock_exclusive_for(&self, timeout: Duration) -> bool { 237 let result = if self 238 .state 239 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 240 .is_ok() 241 { 242 true 243 } else { 244 self.lock_exclusive_slow(util::to_deadline(timeout)) 245 }; 246 if result { 247 self.deadlock_acquire(); 248 } 249 result 250 } 251 252 #[inline] try_lock_exclusive_until(&self, timeout: Instant) -> bool253 fn try_lock_exclusive_until(&self, timeout: Instant) -> bool { 254 let result = if self 255 .state 256 .compare_exchange_weak(0, WRITER_BIT, Ordering::Acquire, Ordering::Relaxed) 257 .is_ok() 258 { 259 true 260 } else { 261 self.lock_exclusive_slow(Some(timeout)) 262 }; 263 if result { 264 self.deadlock_acquire(); 265 } 266 result 267 } 268 } 269 270 unsafe impl lock_api::RawRwLockRecursive for RawRwLock { 271 #[inline] lock_shared_recursive(&self)272 fn lock_shared_recursive(&self) { 273 if !self.try_lock_shared_fast(true) { 274 let result = self.lock_shared_slow(true, None); 275 debug_assert!(result); 276 } 277 self.deadlock_acquire(); 278 } 279 280 #[inline] try_lock_shared_recursive(&self) -> bool281 fn try_lock_shared_recursive(&self) -> bool { 282 let result = if self.try_lock_shared_fast(true) { 283 true 284 } else { 285 self.try_lock_shared_slow(true) 286 }; 287 if result { 288 self.deadlock_acquire(); 289 } 290 result 291 } 292 } 293 294 unsafe impl lock_api::RawRwLockRecursiveTimed for RawRwLock { 295 #[inline] try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool296 fn try_lock_shared_recursive_for(&self, timeout: Self::Duration) -> bool { 297 let result = if self.try_lock_shared_fast(true) { 298 true 299 } else { 300 self.lock_shared_slow(true, util::to_deadline(timeout)) 301 }; 302 if result { 303 self.deadlock_acquire(); 304 } 305 result 306 } 307 308 #[inline] try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool309 fn try_lock_shared_recursive_until(&self, timeout: Self::Instant) -> bool { 310 let result = if self.try_lock_shared_fast(true) { 311 true 312 } else { 313 self.lock_shared_slow(true, Some(timeout)) 314 }; 315 if result { 316 self.deadlock_acquire(); 317 } 318 result 319 } 320 } 321 322 unsafe impl lock_api::RawRwLockUpgrade for RawRwLock { 323 #[inline] lock_upgradable(&self)324 fn lock_upgradable(&self) { 325 if !self.try_lock_upgradable_fast() { 326 let result = self.lock_upgradable_slow(None); 327 debug_assert!(result); 328 } 329 self.deadlock_acquire(); 330 } 331 332 #[inline] try_lock_upgradable(&self) -> bool333 fn try_lock_upgradable(&self) -> bool { 334 let result = if self.try_lock_upgradable_fast() { 335 true 336 } else { 337 self.try_lock_upgradable_slow() 338 }; 339 if result { 340 self.deadlock_acquire(); 341 } 342 result 343 } 344 345 #[inline] unlock_upgradable(&self)346 unsafe fn unlock_upgradable(&self) { 347 self.deadlock_release(); 348 let state = self.state.load(Ordering::Relaxed); 349 #[allow(clippy::collapsible_if)] 350 if state & PARKED_BIT == 0 { 351 if self 352 .state 353 .compare_exchange_weak( 354 state, 355 state - (ONE_READER | UPGRADABLE_BIT), 356 Ordering::Release, 357 Ordering::Relaxed, 358 ) 359 .is_ok() 360 { 361 return; 362 } 363 } 364 self.unlock_upgradable_slow(false); 365 } 366 367 #[inline] upgrade(&self)368 unsafe fn upgrade(&self) { 369 let state = self.state.fetch_sub( 370 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 371 Ordering::Acquire, 372 ); 373 if state & READERS_MASK != ONE_READER { 374 let result = self.upgrade_slow(None); 375 debug_assert!(result); 376 } 377 } 378 379 #[inline] try_upgrade(&self) -> bool380 unsafe fn try_upgrade(&self) -> bool { 381 if self 382 .state 383 .compare_exchange_weak( 384 ONE_READER | UPGRADABLE_BIT, 385 WRITER_BIT, 386 Ordering::Acquire, 387 Ordering::Relaxed, 388 ) 389 .is_ok() 390 { 391 true 392 } else { 393 self.try_upgrade_slow() 394 } 395 } 396 } 397 398 unsafe impl lock_api::RawRwLockUpgradeFair for RawRwLock { 399 #[inline] unlock_upgradable_fair(&self)400 unsafe fn unlock_upgradable_fair(&self) { 401 self.deadlock_release(); 402 let state = self.state.load(Ordering::Relaxed); 403 #[allow(clippy::collapsible_if)] 404 if state & PARKED_BIT == 0 { 405 if self 406 .state 407 .compare_exchange_weak( 408 state, 409 state - (ONE_READER | UPGRADABLE_BIT), 410 Ordering::Release, 411 Ordering::Relaxed, 412 ) 413 .is_ok() 414 { 415 return; 416 } 417 } 418 self.unlock_upgradable_slow(false); 419 } 420 421 #[inline] bump_upgradable(&self)422 unsafe fn bump_upgradable(&self) { 423 if self.state.load(Ordering::Relaxed) == ONE_READER | UPGRADABLE_BIT | PARKED_BIT { 424 self.bump_upgradable_slow(); 425 } 426 } 427 } 428 429 unsafe impl lock_api::RawRwLockUpgradeDowngrade for RawRwLock { 430 #[inline] downgrade_upgradable(&self)431 unsafe fn downgrade_upgradable(&self) { 432 let state = self.state.fetch_sub(UPGRADABLE_BIT, Ordering::Relaxed); 433 434 // Wake up parked upgradable threads if there are any 435 if state & PARKED_BIT != 0 { 436 self.downgrade_slow(); 437 } 438 } 439 440 #[inline] downgrade_to_upgradable(&self)441 unsafe fn downgrade_to_upgradable(&self) { 442 let state = self.state.fetch_add( 443 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 444 Ordering::Release, 445 ); 446 447 // Wake up parked shared threads if there are any 448 if state & PARKED_BIT != 0 { 449 self.downgrade_to_upgradable_slow(); 450 } 451 } 452 } 453 454 unsafe impl lock_api::RawRwLockUpgradeTimed for RawRwLock { 455 #[inline] try_lock_upgradable_until(&self, timeout: Instant) -> bool456 fn try_lock_upgradable_until(&self, timeout: Instant) -> bool { 457 let result = if self.try_lock_upgradable_fast() { 458 true 459 } else { 460 self.lock_upgradable_slow(Some(timeout)) 461 }; 462 if result { 463 self.deadlock_acquire(); 464 } 465 result 466 } 467 468 #[inline] try_lock_upgradable_for(&self, timeout: Duration) -> bool469 fn try_lock_upgradable_for(&self, timeout: Duration) -> bool { 470 let result = if self.try_lock_upgradable_fast() { 471 true 472 } else { 473 self.lock_upgradable_slow(util::to_deadline(timeout)) 474 }; 475 if result { 476 self.deadlock_acquire(); 477 } 478 result 479 } 480 481 #[inline] try_upgrade_until(&self, timeout: Instant) -> bool482 unsafe fn try_upgrade_until(&self, timeout: Instant) -> bool { 483 let state = self.state.fetch_sub( 484 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 485 Ordering::Relaxed, 486 ); 487 if state & READERS_MASK == ONE_READER { 488 true 489 } else { 490 self.upgrade_slow(Some(timeout)) 491 } 492 } 493 494 #[inline] try_upgrade_for(&self, timeout: Duration) -> bool495 unsafe fn try_upgrade_for(&self, timeout: Duration) -> bool { 496 let state = self.state.fetch_sub( 497 (ONE_READER | UPGRADABLE_BIT) - WRITER_BIT, 498 Ordering::Relaxed, 499 ); 500 if state & READERS_MASK == ONE_READER { 501 true 502 } else { 503 self.upgrade_slow(util::to_deadline(timeout)) 504 } 505 } 506 } 507 508 impl RawRwLock { 509 #[inline(always)] try_lock_shared_fast(&self, recursive: bool) -> bool510 fn try_lock_shared_fast(&self, recursive: bool) -> bool { 511 let state = self.state.load(Ordering::Relaxed); 512 513 // We can't allow grabbing a shared lock if there is a writer, even if 514 // the writer is still waiting for the remaining readers to exit. 515 if state & WRITER_BIT != 0 { 516 // To allow recursive locks, we make an exception and allow readers 517 // to skip ahead of a pending writer to avoid deadlocking, at the 518 // cost of breaking the fairness guarantees. 519 if !recursive || state & READERS_MASK == 0 { 520 return false; 521 } 522 } 523 524 // Use hardware lock elision to avoid cache conflicts when multiple 525 // readers try to acquire the lock. We only do this if the lock is 526 // completely empty since elision handles conflicts poorly. 527 if have_elision() && state == 0 { 528 self.state 529 .elision_compare_exchange_acquire(0, ONE_READER) 530 .is_ok() 531 } else if let Some(new_state) = state.checked_add(ONE_READER) { 532 self.state 533 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 534 .is_ok() 535 } else { 536 false 537 } 538 } 539 540 #[cold] try_lock_shared_slow(&self, recursive: bool) -> bool541 fn try_lock_shared_slow(&self, recursive: bool) -> bool { 542 let mut state = self.state.load(Ordering::Relaxed); 543 loop { 544 // This mirrors the condition in try_lock_shared_fast 545 #[allow(clippy::collapsible_if)] 546 if state & WRITER_BIT != 0 { 547 if !recursive || state & READERS_MASK == 0 { 548 return false; 549 } 550 } 551 if have_elision() && state == 0 { 552 match self.state.elision_compare_exchange_acquire(0, ONE_READER) { 553 Ok(_) => return true, 554 Err(x) => state = x, 555 } 556 } else { 557 match self.state.compare_exchange_weak( 558 state, 559 state 560 .checked_add(ONE_READER) 561 .expect("RwLock reader count overflow"), 562 Ordering::Acquire, 563 Ordering::Relaxed, 564 ) { 565 Ok(_) => return true, 566 Err(x) => state = x, 567 } 568 } 569 } 570 } 571 572 #[inline(always)] try_lock_upgradable_fast(&self) -> bool573 fn try_lock_upgradable_fast(&self) -> bool { 574 let state = self.state.load(Ordering::Relaxed); 575 576 // We can't grab an upgradable lock if there is already a writer or 577 // upgradable reader. 578 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 579 return false; 580 } 581 582 if let Some(new_state) = state.checked_add(ONE_READER | UPGRADABLE_BIT) { 583 self.state 584 .compare_exchange_weak(state, new_state, Ordering::Acquire, Ordering::Relaxed) 585 .is_ok() 586 } else { 587 false 588 } 589 } 590 591 #[cold] try_lock_upgradable_slow(&self) -> bool592 fn try_lock_upgradable_slow(&self) -> bool { 593 let mut state = self.state.load(Ordering::Relaxed); 594 loop { 595 // This mirrors the condition in try_lock_upgradable_fast 596 if state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 597 return false; 598 } 599 600 match self.state.compare_exchange_weak( 601 state, 602 state 603 .checked_add(ONE_READER | UPGRADABLE_BIT) 604 .expect("RwLock reader count overflow"), 605 Ordering::Acquire, 606 Ordering::Relaxed, 607 ) { 608 Ok(_) => return true, 609 Err(x) => state = x, 610 } 611 } 612 } 613 614 #[cold] lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool615 fn lock_exclusive_slow(&self, timeout: Option<Instant>) -> bool { 616 let try_lock = |state: &mut usize| { 617 loop { 618 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 619 return false; 620 } 621 622 // Grab WRITER_BIT if it isn't set, even if there are parked threads. 623 match self.state.compare_exchange_weak( 624 *state, 625 *state | WRITER_BIT, 626 Ordering::Acquire, 627 Ordering::Relaxed, 628 ) { 629 Ok(_) => return true, 630 Err(x) => *state = x, 631 } 632 } 633 }; 634 635 // Step 1: grab exclusive ownership of WRITER_BIT 636 let timed_out = !self.lock_common( 637 timeout, 638 TOKEN_EXCLUSIVE, 639 try_lock, 640 WRITER_BIT | UPGRADABLE_BIT, 641 ); 642 if timed_out { 643 return false; 644 } 645 646 // Step 2: wait for all remaining readers to exit the lock. 647 self.wait_for_readers(timeout, 0) 648 } 649 650 #[cold] unlock_exclusive_slow(&self, force_fair: bool)651 fn unlock_exclusive_slow(&self, force_fair: bool) { 652 // There are threads to unpark. Try to unpark as many as we can. 653 let callback = |mut new_state, result: UnparkResult| { 654 // If we are using a fair unlock then we should keep the 655 // rwlock locked and hand it off to the unparked threads. 656 if result.unparked_threads != 0 && (force_fair || result.be_fair) { 657 if result.have_more_threads { 658 new_state |= PARKED_BIT; 659 } 660 self.state.store(new_state, Ordering::Release); 661 TOKEN_HANDOFF 662 } else { 663 // Clear the parked bit if there are no more parked threads. 664 if result.have_more_threads { 665 self.state.store(PARKED_BIT, Ordering::Release); 666 } else { 667 self.state.store(0, Ordering::Release); 668 } 669 TOKEN_NORMAL 670 } 671 }; 672 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 673 unsafe { 674 self.wake_parked_threads(0, callback); 675 } 676 } 677 678 #[cold] lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool679 fn lock_shared_slow(&self, recursive: bool, timeout: Option<Instant>) -> bool { 680 let try_lock = |state: &mut usize| { 681 let mut spinwait_shared = SpinWait::new(); 682 loop { 683 // Use hardware lock elision to avoid cache conflicts when multiple 684 // readers try to acquire the lock. We only do this if the lock is 685 // completely empty since elision handles conflicts poorly. 686 if have_elision() && *state == 0 { 687 match self.state.elision_compare_exchange_acquire(0, ONE_READER) { 688 Ok(_) => return true, 689 Err(x) => *state = x, 690 } 691 } 692 693 // This is the same condition as try_lock_shared_fast 694 #[allow(clippy::collapsible_if)] 695 if *state & WRITER_BIT != 0 { 696 if !recursive || *state & READERS_MASK == 0 { 697 return false; 698 } 699 } 700 701 if self 702 .state 703 .compare_exchange_weak( 704 *state, 705 state 706 .checked_add(ONE_READER) 707 .expect("RwLock reader count overflow"), 708 Ordering::Acquire, 709 Ordering::Relaxed, 710 ) 711 .is_ok() 712 { 713 return true; 714 } 715 716 // If there is high contention on the reader count then we want 717 // to leave some time between attempts to acquire the lock to 718 // let other threads make progress. 719 spinwait_shared.spin_no_yield(); 720 *state = self.state.load(Ordering::Relaxed); 721 } 722 }; 723 self.lock_common(timeout, TOKEN_SHARED, try_lock, WRITER_BIT) 724 } 725 726 #[cold] unlock_shared_slow(&self)727 fn unlock_shared_slow(&self) { 728 // At this point WRITER_PARKED_BIT is set and READER_MASK is empty. We 729 // just need to wake up a potentially sleeping pending writer. 730 // Using the 2nd key at addr + 1 731 let addr = self as *const _ as usize + 1; 732 let callback = |_result: UnparkResult| { 733 // Clear the WRITER_PARKED_BIT here since there can only be one 734 // parked writer thread. 735 self.state.fetch_and(!WRITER_PARKED_BIT, Ordering::Relaxed); 736 TOKEN_NORMAL 737 }; 738 // SAFETY: 739 // * `addr` is an address we control. 740 // * `callback` does not panic or call into any function of `parking_lot`. 741 unsafe { 742 parking_lot_core::unpark_one(addr, callback); 743 } 744 } 745 746 #[cold] lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool747 fn lock_upgradable_slow(&self, timeout: Option<Instant>) -> bool { 748 let try_lock = |state: &mut usize| { 749 let mut spinwait_shared = SpinWait::new(); 750 loop { 751 if *state & (WRITER_BIT | UPGRADABLE_BIT) != 0 { 752 return false; 753 } 754 755 if self 756 .state 757 .compare_exchange_weak( 758 *state, 759 state 760 .checked_add(ONE_READER | UPGRADABLE_BIT) 761 .expect("RwLock reader count overflow"), 762 Ordering::Acquire, 763 Ordering::Relaxed, 764 ) 765 .is_ok() 766 { 767 return true; 768 } 769 770 // If there is high contention on the reader count then we want 771 // to leave some time between attempts to acquire the lock to 772 // let other threads make progress. 773 spinwait_shared.spin_no_yield(); 774 *state = self.state.load(Ordering::Relaxed); 775 } 776 }; 777 self.lock_common( 778 timeout, 779 TOKEN_UPGRADABLE, 780 try_lock, 781 WRITER_BIT | UPGRADABLE_BIT, 782 ) 783 } 784 785 #[cold] unlock_upgradable_slow(&self, force_fair: bool)786 fn unlock_upgradable_slow(&self, force_fair: bool) { 787 // Just release the lock if there are no parked threads. 788 let mut state = self.state.load(Ordering::Relaxed); 789 while state & PARKED_BIT == 0 { 790 match self.state.compare_exchange_weak( 791 state, 792 state - (ONE_READER | UPGRADABLE_BIT), 793 Ordering::Release, 794 Ordering::Relaxed, 795 ) { 796 Ok(_) => return, 797 Err(x) => state = x, 798 } 799 } 800 801 // There are threads to unpark. Try to unpark as many as we can. 802 let callback = |new_state, result: UnparkResult| { 803 // If we are using a fair unlock then we should keep the 804 // rwlock locked and hand it off to the unparked threads. 805 let mut state = self.state.load(Ordering::Relaxed); 806 if force_fair || result.be_fair { 807 // Fall back to normal unpark on overflow. Panicking is 808 // not allowed in parking_lot callbacks. 809 while let Some(mut new_state) = 810 (state - (ONE_READER | UPGRADABLE_BIT)).checked_add(new_state) 811 { 812 if result.have_more_threads { 813 new_state |= PARKED_BIT; 814 } else { 815 new_state &= !PARKED_BIT; 816 } 817 match self.state.compare_exchange_weak( 818 state, 819 new_state, 820 Ordering::Relaxed, 821 Ordering::Relaxed, 822 ) { 823 Ok(_) => return TOKEN_HANDOFF, 824 Err(x) => state = x, 825 } 826 } 827 } 828 829 // Otherwise just release the upgradable lock and update PARKED_BIT. 830 loop { 831 let mut new_state = state - (ONE_READER | UPGRADABLE_BIT); 832 if result.have_more_threads { 833 new_state |= PARKED_BIT; 834 } else { 835 new_state &= !PARKED_BIT; 836 } 837 match self.state.compare_exchange_weak( 838 state, 839 new_state, 840 Ordering::Relaxed, 841 Ordering::Relaxed, 842 ) { 843 Ok(_) => return TOKEN_NORMAL, 844 Err(x) => state = x, 845 } 846 } 847 }; 848 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 849 unsafe { 850 self.wake_parked_threads(0, callback); 851 } 852 } 853 854 #[cold] try_upgrade_slow(&self) -> bool855 fn try_upgrade_slow(&self) -> bool { 856 let mut state = self.state.load(Ordering::Relaxed); 857 loop { 858 if state & READERS_MASK != ONE_READER { 859 return false; 860 } 861 match self.state.compare_exchange_weak( 862 state, 863 state - (ONE_READER | UPGRADABLE_BIT) + WRITER_BIT, 864 Ordering::Relaxed, 865 Ordering::Relaxed, 866 ) { 867 Ok(_) => return true, 868 Err(x) => state = x, 869 } 870 } 871 } 872 873 #[cold] upgrade_slow(&self, timeout: Option<Instant>) -> bool874 fn upgrade_slow(&self, timeout: Option<Instant>) -> bool { 875 self.wait_for_readers(timeout, ONE_READER | UPGRADABLE_BIT) 876 } 877 878 #[cold] downgrade_slow(&self)879 fn downgrade_slow(&self) { 880 // We only reach this point if PARKED_BIT is set. 881 let callback = |_, result: UnparkResult| { 882 // Clear the parked bit if there no more parked threads 883 if !result.have_more_threads { 884 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 885 } 886 TOKEN_NORMAL 887 }; 888 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 889 unsafe { 890 self.wake_parked_threads(ONE_READER, callback); 891 } 892 } 893 894 #[cold] downgrade_to_upgradable_slow(&self)895 fn downgrade_to_upgradable_slow(&self) { 896 // We only reach this point if PARKED_BIT is set. 897 let callback = |_, result: UnparkResult| { 898 // Clear the parked bit if there no more parked threads 899 if !result.have_more_threads { 900 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 901 } 902 TOKEN_NORMAL 903 }; 904 // SAFETY: `callback` does not panic or call into any function of `parking_lot`. 905 unsafe { 906 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); 907 } 908 } 909 910 #[cold] bump_shared_slow(&self)911 unsafe fn bump_shared_slow(&self) { 912 self.unlock_shared(); 913 self.lock_shared(); 914 } 915 916 #[cold] bump_exclusive_slow(&self)917 fn bump_exclusive_slow(&self) { 918 self.deadlock_release(); 919 self.unlock_exclusive_slow(true); 920 self.lock_exclusive(); 921 } 922 923 #[cold] bump_upgradable_slow(&self)924 fn bump_upgradable_slow(&self) { 925 self.deadlock_release(); 926 self.unlock_upgradable_slow(true); 927 self.lock_upgradable(); 928 } 929 930 /// Common code for waking up parked threads after releasing `WRITER_BIT` or 931 /// `UPGRADABLE_BIT`. 932 /// 933 /// # Safety 934 /// 935 /// `callback` must uphold the requirements of the `callback` parameter to 936 /// `parking_lot_core::unpark_filter`. Meaning no panics or calls into any function in 937 /// `parking_lot`. 938 #[inline] wake_parked_threads( &self, new_state: usize, callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, )939 unsafe fn wake_parked_threads( 940 &self, 941 new_state: usize, 942 callback: impl FnOnce(usize, UnparkResult) -> UnparkToken, 943 ) { 944 // We must wake up at least one upgrader or writer if there is one, 945 // otherwise they may end up parked indefinitely since unlock_shared 946 // does not call wake_parked_threads. 947 let new_state = Cell::new(new_state); 948 let addr = self as *const _ as usize; 949 let filter = |ParkToken(token)| { 950 let s = new_state.get(); 951 952 // If we are waking up a writer, don't wake anything else. 953 if s & WRITER_BIT != 0 { 954 return FilterOp::Stop; 955 } 956 957 // Otherwise wake *all* readers and one upgrader/writer. 958 if token & (UPGRADABLE_BIT | WRITER_BIT) != 0 && s & UPGRADABLE_BIT != 0 { 959 // Skip writers and upgradable readers if we already have 960 // a writer/upgradable reader. 961 FilterOp::Skip 962 } else { 963 new_state.set(s + token); 964 FilterOp::Unpark 965 } 966 }; 967 let callback = |result| callback(new_state.get(), result); 968 // SAFETY: 969 // * `addr` is an address we control. 970 // * `filter` does not panic or call into any function of `parking_lot`. 971 // * `callback` safety responsibility is on caller 972 parking_lot_core::unpark_filter(addr, filter, callback); 973 } 974 975 // Common code for waiting for readers to exit the lock after acquiring 976 // WRITER_BIT. 977 #[inline] wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool978 fn wait_for_readers(&self, timeout: Option<Instant>, prev_value: usize) -> bool { 979 // At this point WRITER_BIT is already set, we just need to wait for the 980 // remaining readers to exit the lock. 981 let mut spinwait = SpinWait::new(); 982 let mut state = self.state.load(Ordering::Acquire); 983 while state & READERS_MASK != 0 { 984 // Spin a few times to wait for readers to exit 985 if spinwait.spin() { 986 state = self.state.load(Ordering::Acquire); 987 continue; 988 } 989 990 // Set the parked bit 991 if state & WRITER_PARKED_BIT == 0 { 992 if let Err(x) = self.state.compare_exchange_weak( 993 state, 994 state | WRITER_PARKED_BIT, 995 Ordering::Acquire, 996 Ordering::Acquire, 997 ) { 998 state = x; 999 continue; 1000 } 1001 } 1002 1003 // Park our thread until we are woken up by an unlock 1004 // Using the 2nd key at addr + 1 1005 let addr = self as *const _ as usize + 1; 1006 let validate = || { 1007 let state = self.state.load(Ordering::Relaxed); 1008 state & READERS_MASK != 0 && state & WRITER_PARKED_BIT != 0 1009 }; 1010 let before_sleep = || {}; 1011 let timed_out = |_, _| {}; 1012 // SAFETY: 1013 // * `addr` is an address we control. 1014 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. 1015 // * `before_sleep` does not call `park`, nor does it panic. 1016 let park_result = unsafe { 1017 parking_lot_core::park( 1018 addr, 1019 validate, 1020 before_sleep, 1021 timed_out, 1022 TOKEN_EXCLUSIVE, 1023 timeout, 1024 ) 1025 }; 1026 match park_result { 1027 // We still need to re-check the state if we are unparked 1028 // since a previous writer timing-out could have allowed 1029 // another reader to sneak in before we parked. 1030 ParkResult::Unparked(_) | ParkResult::Invalid => { 1031 state = self.state.load(Ordering::Acquire); 1032 continue; 1033 } 1034 1035 // Timeout expired 1036 ParkResult::TimedOut => { 1037 // We need to release WRITER_BIT and revert back to 1038 // our previous value. We also wake up any threads that 1039 // might be waiting on WRITER_BIT. 1040 let state = self.state.fetch_add( 1041 prev_value.wrapping_sub(WRITER_BIT | WRITER_PARKED_BIT), 1042 Ordering::Relaxed, 1043 ); 1044 if state & PARKED_BIT != 0 { 1045 let callback = |_, result: UnparkResult| { 1046 // Clear the parked bit if there no more parked threads 1047 if !result.have_more_threads { 1048 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 1049 } 1050 TOKEN_NORMAL 1051 }; 1052 // SAFETY: `callback` does not panic or call any function of `parking_lot`. 1053 unsafe { 1054 self.wake_parked_threads(ONE_READER | UPGRADABLE_BIT, callback); 1055 } 1056 } 1057 return false; 1058 } 1059 } 1060 } 1061 true 1062 } 1063 1064 /// Common code for acquiring a lock 1065 #[inline] lock_common( &self, timeout: Option<Instant>, token: ParkToken, mut try_lock: impl FnMut(&mut usize) -> bool, validate_flags: usize, ) -> bool1066 fn lock_common( 1067 &self, 1068 timeout: Option<Instant>, 1069 token: ParkToken, 1070 mut try_lock: impl FnMut(&mut usize) -> bool, 1071 validate_flags: usize, 1072 ) -> bool { 1073 let mut spinwait = SpinWait::new(); 1074 let mut state = self.state.load(Ordering::Relaxed); 1075 loop { 1076 // Attempt to grab the lock 1077 if try_lock(&mut state) { 1078 return true; 1079 } 1080 1081 // If there are no parked threads, try spinning a few times. 1082 if state & (PARKED_BIT | WRITER_PARKED_BIT) == 0 && spinwait.spin() { 1083 state = self.state.load(Ordering::Relaxed); 1084 continue; 1085 } 1086 1087 // Set the parked bit 1088 if state & PARKED_BIT == 0 { 1089 if let Err(x) = self.state.compare_exchange_weak( 1090 state, 1091 state | PARKED_BIT, 1092 Ordering::Relaxed, 1093 Ordering::Relaxed, 1094 ) { 1095 state = x; 1096 continue; 1097 } 1098 } 1099 1100 // Park our thread until we are woken up by an unlock 1101 let addr = self as *const _ as usize; 1102 let validate = || { 1103 let state = self.state.load(Ordering::Relaxed); 1104 state & PARKED_BIT != 0 && (state & validate_flags != 0) 1105 }; 1106 let before_sleep = || {}; 1107 let timed_out = |_, was_last_thread| { 1108 // Clear the parked bit if we were the last parked thread 1109 if was_last_thread { 1110 self.state.fetch_and(!PARKED_BIT, Ordering::Relaxed); 1111 } 1112 }; 1113 1114 // SAFETY: 1115 // * `addr` is an address we control. 1116 // * `validate`/`timed_out` does not panic or call into any function of `parking_lot`. 1117 // * `before_sleep` does not call `park`, nor does it panic. 1118 let park_result = unsafe { 1119 parking_lot_core::park(addr, validate, before_sleep, timed_out, token, timeout) 1120 }; 1121 match park_result { 1122 // The thread that unparked us passed the lock on to us 1123 // directly without unlocking it. 1124 ParkResult::Unparked(TOKEN_HANDOFF) => return true, 1125 1126 // We were unparked normally, try acquiring the lock again 1127 ParkResult::Unparked(_) => (), 1128 1129 // The validation function failed, try locking again 1130 ParkResult::Invalid => (), 1131 1132 // Timeout expired 1133 ParkResult::TimedOut => return false, 1134 } 1135 1136 // Loop back and try locking again 1137 spinwait.reset(); 1138 state = self.state.load(Ordering::Relaxed); 1139 } 1140 } 1141 1142 #[inline] deadlock_acquire(&self)1143 fn deadlock_acquire(&self) { 1144 unsafe { deadlock::acquire_resource(self as *const _ as usize) }; 1145 unsafe { deadlock::acquire_resource(self as *const _ as usize + 1) }; 1146 } 1147 1148 #[inline] deadlock_release(&self)1149 fn deadlock_release(&self) { 1150 unsafe { deadlock::release_resource(self as *const _ as usize) }; 1151 unsafe { deadlock::release_resource(self as *const _ as usize + 1) }; 1152 } 1153 } 1154