1 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] 2 //! # Implementation Details. 3 //! 4 //! The semaphore is implemented using an intrusive linked list of waiters. An 5 //! atomic counter tracks the number of available permits. If the semaphore does 6 //! not contain the required number of permits, the task attempting to acquire 7 //! permits places its waker at the end of a queue. When new permits are made 8 //! available (such as by releasing an initial acquisition), they are assigned 9 //! to the task at the front of the queue, waking that task if its requested 10 //! number of permits is met. 11 //! 12 //! Because waiters are enqueued at the back of the linked list and dequeued 13 //! from the front, the semaphore is fair. Tasks trying to acquire large numbers 14 //! of permits at a time will always be woken eventually, even if many other 15 //! tasks are acquiring smaller numbers of permits. This means that in a 16 //! use-case like tokio's read-write lock, writers will not be starved by 17 //! readers. 18 use crate::loom::cell::UnsafeCell; 19 use crate::loom::sync::atomic::AtomicUsize; 20 use crate::loom::sync::{Mutex, MutexGuard}; 21 use crate::util::linked_list::{self, LinkedList}; 22 use crate::util::WakeList; 23 24 use std::future::Future; 25 use std::marker::PhantomPinned; 26 use std::pin::Pin; 27 use std::ptr::NonNull; 28 use std::sync::atomic::Ordering::*; 29 use std::task::Poll::*; 30 use std::task::{Context, Poll, Waker}; 31 use std::{cmp, fmt}; 32 33 /// An asynchronous counting semaphore which permits waiting on multiple permits at once. 34 pub(crate) struct Semaphore { 35 waiters: Mutex<Waitlist>, 36 /// The current number of available permits in the semaphore. 37 permits: AtomicUsize, 38 } 39 40 struct Waitlist { 41 queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>, 42 closed: bool, 43 } 44 45 /// Error returned from the [`Semaphore::try_acquire`] function. 46 /// 47 /// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire 48 #[derive(Debug, PartialEq)] 49 pub enum TryAcquireError { 50 /// The semaphore has been [closed] and cannot issue new permits. 51 /// 52 /// [closed]: crate::sync::Semaphore::close 53 Closed, 54 55 /// The semaphore has no available permits. 56 NoPermits, 57 } 58 /// Error returned from the [`Semaphore::acquire`] function. 59 /// 60 /// An `acquire` operation can only fail if the semaphore has been 61 /// [closed]. 62 /// 63 /// [closed]: crate::sync::Semaphore::close 64 /// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire 65 #[derive(Debug)] 66 pub struct AcquireError(()); 67 68 pub(crate) struct Acquire<'a> { 69 node: Waiter, 70 semaphore: &'a Semaphore, 71 num_permits: u32, 72 queued: bool, 73 } 74 75 /// An entry in the wait queue. 76 struct Waiter { 77 /// The current state of the waiter. 78 /// 79 /// This is either the number of remaining permits required by 80 /// the waiter, or a flag indicating that the waiter is not yet queued. 81 state: AtomicUsize, 82 83 /// The waker to notify the task awaiting permits. 84 /// 85 /// # Safety 86 /// 87 /// This may only be accessed while the wait queue is locked. 88 waker: UnsafeCell<Option<Waker>>, 89 90 /// Intrusive linked-list pointers. 91 /// 92 /// # Safety 93 /// 94 /// This may only be accessed while the wait queue is locked. 95 /// 96 /// TODO: Ideally, we would be able to use loom to enforce that 97 /// this isn't accessed concurrently. However, it is difficult to 98 /// use a `UnsafeCell` here, since the `Link` trait requires _returning_ 99 /// references to `Pointers`, and `UnsafeCell` requires that checked access 100 /// take place inside a closure. We should consider changing `Pointers` to 101 /// use `UnsafeCell` internally. 102 pointers: linked_list::Pointers<Waiter>, 103 104 /// Should not be `Unpin`. 105 _p: PhantomPinned, 106 } 107 108 impl Semaphore { 109 /// The maximum number of permits which a semaphore can hold. 110 /// 111 /// Note that this reserves three bits of flags in the permit counter, but 112 /// we only actually use one of them. However, the previous semaphore 113 /// implementation used three bits, so we will continue to reserve them to 114 /// avoid a breaking change if additional flags need to be added in the 115 /// future. 116 pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3; 117 const CLOSED: usize = 1; 118 // The least-significant bit in the number of permits is reserved to use 119 // as a flag indicating that the semaphore has been closed. Consequently 120 // PERMIT_SHIFT is used to leave that bit for that purpose. 121 const PERMIT_SHIFT: usize = 1; 122 123 /// Creates a new semaphore with the initial number of permits 124 /// 125 /// Maximum number of permits on 32-bit platforms is `1<<29`. new(permits: usize) -> Self126 pub(crate) fn new(permits: usize) -> Self { 127 assert!( 128 permits <= Self::MAX_PERMITS, 129 "a semaphore may not have more than MAX_PERMITS permits ({})", 130 Self::MAX_PERMITS 131 ); 132 Self { 133 permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), 134 waiters: Mutex::new(Waitlist { 135 queue: LinkedList::new(), 136 closed: false, 137 }), 138 } 139 } 140 141 /// Creates a new semaphore with the initial number of permits. 142 /// 143 /// Maximum number of permits on 32-bit platforms is `1<<29`. 144 /// 145 /// If the specified number of permits exceeds the maximum permit amount 146 /// Then the value will get clamped to the maximum number of permits. 147 #[cfg(all(feature = "parking_lot", not(all(loom, test))))] const_new(mut permits: usize) -> Self148 pub(crate) const fn const_new(mut permits: usize) -> Self { 149 // NOTE: assertions and by extension panics are still being worked on: https://github.com/rust-lang/rust/issues/74925 150 // currently we just clamp the permit count when it exceeds the max 151 permits &= Self::MAX_PERMITS; 152 153 Self { 154 permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), 155 waiters: Mutex::const_new(Waitlist { 156 queue: LinkedList::new(), 157 closed: false, 158 }), 159 } 160 } 161 162 /// Returns the current number of available permits. available_permits(&self) -> usize163 pub(crate) fn available_permits(&self) -> usize { 164 self.permits.load(Acquire) >> Self::PERMIT_SHIFT 165 } 166 167 /// Adds `added` new permits to the semaphore. 168 /// 169 /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded. release(&self, added: usize)170 pub(crate) fn release(&self, added: usize) { 171 if added == 0 { 172 return; 173 } 174 175 // Assign permits to the wait queue 176 self.add_permits_locked(added, self.waiters.lock()); 177 } 178 179 /// Closes the semaphore. This prevents the semaphore from issuing new 180 /// permits and notifies all pending waiters. close(&self)181 pub(crate) fn close(&self) { 182 let mut waiters = self.waiters.lock(); 183 // If the semaphore's permits counter has enough permits for an 184 // unqueued waiter to acquire all the permits it needs immediately, 185 // it won't touch the wait list. Therefore, we have to set a bit on 186 // the permit counter as well. However, we must do this while 187 // holding the lock --- otherwise, if we set the bit and then wait 188 // to acquire the lock we'll enter an inconsistent state where the 189 // permit counter is closed, but the wait list is not. 190 self.permits.fetch_or(Self::CLOSED, Release); 191 waiters.closed = true; 192 while let Some(mut waiter) = waiters.queue.pop_back() { 193 let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }; 194 if let Some(waker) = waker { 195 waker.wake(); 196 } 197 } 198 } 199 200 /// Returns true if the semaphore is closed. is_closed(&self) -> bool201 pub(crate) fn is_closed(&self) -> bool { 202 self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED 203 } 204 try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError>205 pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> { 206 assert!( 207 num_permits as usize <= Self::MAX_PERMITS, 208 "a semaphore may not have more than MAX_PERMITS permits ({})", 209 Self::MAX_PERMITS 210 ); 211 let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT; 212 let mut curr = self.permits.load(Acquire); 213 loop { 214 // Has the semaphore closed? 215 if curr & Self::CLOSED == Self::CLOSED { 216 return Err(TryAcquireError::Closed); 217 } 218 219 // Are there enough permits remaining? 220 if curr < num_permits { 221 return Err(TryAcquireError::NoPermits); 222 } 223 224 let next = curr - num_permits; 225 226 match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { 227 Ok(_) => return Ok(()), 228 Err(actual) => curr = actual, 229 } 230 } 231 } 232 acquire(&self, num_permits: u32) -> Acquire<'_>233 pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> { 234 Acquire::new(self, num_permits) 235 } 236 237 /// Release `rem` permits to the semaphore's wait list, starting from the 238 /// end of the queue. 239 /// 240 /// If `rem` exceeds the number of permits needed by the wait list, the 241 /// remainder are assigned back to the semaphore. add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>)242 fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) { 243 let mut wakers = WakeList::new(); 244 let mut lock = Some(waiters); 245 let mut is_empty = false; 246 while rem > 0 { 247 let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock()); 248 'inner: while wakers.can_push() { 249 // Was the waiter assigned enough permits to wake it? 250 match waiters.queue.last() { 251 Some(waiter) => { 252 if !waiter.assign_permits(&mut rem) { 253 break 'inner; 254 } 255 } 256 None => { 257 is_empty = true; 258 // If we assigned permits to all the waiters in the queue, and there are 259 // still permits left over, assign them back to the semaphore. 260 break 'inner; 261 } 262 }; 263 let mut waiter = waiters.queue.pop_back().unwrap(); 264 if let Some(waker) = 265 unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) } 266 { 267 wakers.push(waker); 268 } 269 } 270 271 if rem > 0 && is_empty { 272 let permits = rem; 273 assert!( 274 permits <= Self::MAX_PERMITS, 275 "cannot add more than MAX_PERMITS permits ({})", 276 Self::MAX_PERMITS 277 ); 278 let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release); 279 let prev = prev >> Self::PERMIT_SHIFT; 280 assert!( 281 prev + permits <= Self::MAX_PERMITS, 282 "number of added permits ({}) would overflow MAX_PERMITS ({})", 283 rem, 284 Self::MAX_PERMITS 285 ); 286 rem = 0; 287 } 288 289 drop(waiters); // release the lock 290 291 wakers.wake_all(); 292 } 293 294 assert_eq!(rem, 0); 295 } 296 poll_acquire( &self, cx: &mut Context<'_>, num_permits: u32, node: Pin<&mut Waiter>, queued: bool, ) -> Poll<Result<(), AcquireError>>297 fn poll_acquire( 298 &self, 299 cx: &mut Context<'_>, 300 num_permits: u32, 301 node: Pin<&mut Waiter>, 302 queued: bool, 303 ) -> Poll<Result<(), AcquireError>> { 304 let mut acquired = 0; 305 306 let needed = if queued { 307 node.state.load(Acquire) << Self::PERMIT_SHIFT 308 } else { 309 (num_permits as usize) << Self::PERMIT_SHIFT 310 }; 311 312 let mut lock = None; 313 // First, try to take the requested number of permits from the 314 // semaphore. 315 let mut curr = self.permits.load(Acquire); 316 let mut waiters = loop { 317 // Has the semaphore closed? 318 if curr & Self::CLOSED > 0 { 319 return Ready(Err(AcquireError::closed())); 320 } 321 322 let mut remaining = 0; 323 let total = curr 324 .checked_add(acquired) 325 .expect("number of permits must not overflow"); 326 let (next, acq) = if total >= needed { 327 let next = curr - (needed - acquired); 328 (next, needed >> Self::PERMIT_SHIFT) 329 } else { 330 remaining = (needed - acquired) - curr; 331 (0, curr >> Self::PERMIT_SHIFT) 332 }; 333 334 if remaining > 0 && lock.is_none() { 335 // No permits were immediately available, so this permit will 336 // (probably) need to wait. We'll need to acquire a lock on the 337 // wait queue before continuing. We need to do this _before_ the 338 // CAS that sets the new value of the semaphore's `permits` 339 // counter. Otherwise, if we subtract the permits and then 340 // acquire the lock, we might miss additional permits being 341 // added while waiting for the lock. 342 lock = Some(self.waiters.lock()); 343 } 344 345 match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { 346 Ok(_) => { 347 acquired += acq; 348 if remaining == 0 { 349 if !queued { 350 return Ready(Ok(())); 351 } else if lock.is_none() { 352 break self.waiters.lock(); 353 } 354 } 355 break lock.expect("lock must be acquired before waiting"); 356 } 357 Err(actual) => curr = actual, 358 } 359 }; 360 361 if waiters.closed { 362 return Ready(Err(AcquireError::closed())); 363 } 364 365 if node.assign_permits(&mut acquired) { 366 self.add_permits_locked(acquired, waiters); 367 return Ready(Ok(())); 368 } 369 370 assert_eq!(acquired, 0); 371 372 // Otherwise, register the waker & enqueue the node. 373 node.waker.with_mut(|waker| { 374 // Safety: the wait list is locked, so we may modify the waker. 375 let waker = unsafe { &mut *waker }; 376 // Do we need to register the new waker? 377 if waker 378 .as_ref() 379 .map(|waker| !waker.will_wake(cx.waker())) 380 .unwrap_or(true) 381 { 382 *waker = Some(cx.waker().clone()); 383 } 384 }); 385 386 // If the waiter is not already in the wait queue, enqueue it. 387 if !queued { 388 let node = unsafe { 389 let node = Pin::into_inner_unchecked(node) as *mut _; 390 NonNull::new_unchecked(node) 391 }; 392 393 waiters.queue.push_front(node); 394 } 395 396 Pending 397 } 398 } 399 400 impl fmt::Debug for Semaphore { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result401 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 402 fmt.debug_struct("Semaphore") 403 .field("permits", &self.available_permits()) 404 .finish() 405 } 406 } 407 408 impl Waiter { new(num_permits: u32) -> Self409 fn new(num_permits: u32) -> Self { 410 Waiter { 411 waker: UnsafeCell::new(None), 412 state: AtomicUsize::new(num_permits as usize), 413 pointers: linked_list::Pointers::new(), 414 _p: PhantomPinned, 415 } 416 } 417 418 /// Assign permits to the waiter. 419 /// 420 /// Returns `true` if the waiter should be removed from the queue assign_permits(&self, n: &mut usize) -> bool421 fn assign_permits(&self, n: &mut usize) -> bool { 422 let mut curr = self.state.load(Acquire); 423 loop { 424 let assign = cmp::min(curr, *n); 425 let next = curr - assign; 426 match self.state.compare_exchange(curr, next, AcqRel, Acquire) { 427 Ok(_) => { 428 *n -= assign; 429 return next == 0; 430 } 431 Err(actual) => curr = actual, 432 } 433 } 434 } 435 } 436 437 impl Future for Acquire<'_> { 438 type Output = Result<(), AcquireError>; 439 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>440 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 441 // First, ensure the current task has enough budget to proceed. 442 let coop = ready!(crate::coop::poll_proceed(cx)); 443 444 let (node, semaphore, needed, queued) = self.project(); 445 446 match semaphore.poll_acquire(cx, needed, node, *queued) { 447 Pending => { 448 *queued = true; 449 Pending 450 } 451 Ready(r) => { 452 coop.made_progress(); 453 r?; 454 *queued = false; 455 Ready(Ok(())) 456 } 457 } 458 } 459 } 460 461 impl<'a> Acquire<'a> { new(semaphore: &'a Semaphore, num_permits: u32) -> Self462 fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self { 463 Self { 464 node: Waiter::new(num_permits), 465 semaphore, 466 num_permits, 467 queued: false, 468 } 469 } 470 project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool)471 fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) { 472 fn is_unpin<T: Unpin>() {} 473 unsafe { 474 // Safety: all fields other than `node` are `Unpin` 475 476 is_unpin::<&Semaphore>(); 477 is_unpin::<&mut bool>(); 478 is_unpin::<u32>(); 479 480 let this = self.get_unchecked_mut(); 481 ( 482 Pin::new_unchecked(&mut this.node), 483 this.semaphore, 484 this.num_permits, 485 &mut this.queued, 486 ) 487 } 488 } 489 } 490 491 impl Drop for Acquire<'_> { drop(&mut self)492 fn drop(&mut self) { 493 // If the future is completed, there is no node in the wait list, so we 494 // can skip acquiring the lock. 495 if !self.queued { 496 return; 497 } 498 499 // This is where we ensure safety. The future is being dropped, 500 // which means we must ensure that the waiter entry is no longer stored 501 // in the linked list. 502 let mut waiters = self.semaphore.waiters.lock(); 503 504 // remove the entry from the list 505 let node = NonNull::from(&mut self.node); 506 // Safety: we have locked the wait list. 507 unsafe { waiters.queue.remove(node) }; 508 509 let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire); 510 if acquired_permits > 0 { 511 self.semaphore.add_permits_locked(acquired_permits, waiters); 512 } 513 } 514 } 515 516 // Safety: the `Acquire` future is not `Sync` automatically because it contains 517 // a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the 518 // `UnsafeCell` is only accessed when the future is borrowed mutably (either in 519 // `poll` or in `drop`). Therefore, it is safe (although not particularly 520 // _useful_) for the future to be borrowed immutably across threads. 521 unsafe impl Sync for Acquire<'_> {} 522 523 // ===== impl AcquireError ==== 524 525 impl AcquireError { closed() -> AcquireError526 fn closed() -> AcquireError { 527 AcquireError(()) 528 } 529 } 530 531 impl fmt::Display for AcquireError { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result532 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 533 write!(fmt, "semaphore closed") 534 } 535 } 536 537 impl std::error::Error for AcquireError {} 538 539 // ===== impl TryAcquireError ===== 540 541 impl TryAcquireError { 542 /// Returns `true` if the error was caused by a closed semaphore. 543 #[allow(dead_code)] // may be used later! is_closed(&self) -> bool544 pub(crate) fn is_closed(&self) -> bool { 545 matches!(self, TryAcquireError::Closed) 546 } 547 548 /// Returns `true` if the error was caused by calling `try_acquire` on a 549 /// semaphore with no available permits. 550 #[allow(dead_code)] // may be used later! is_no_permits(&self) -> bool551 pub(crate) fn is_no_permits(&self) -> bool { 552 matches!(self, TryAcquireError::NoPermits) 553 } 554 } 555 556 impl fmt::Display for TryAcquireError { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result557 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 558 match self { 559 TryAcquireError::Closed => write!(fmt, "semaphore closed"), 560 TryAcquireError::NoPermits => write!(fmt, "no permits available"), 561 } 562 } 563 } 564 565 impl std::error::Error for TryAcquireError {} 566 567 /// # Safety 568 /// 569 /// `Waiter` is forced to be !Unpin. 570 unsafe impl linked_list::Link for Waiter { 571 // XXX: ideally, we would be able to use `Pin` here, to enforce the 572 // invariant that list entries may not move while in the list. However, we 573 // can't do this currently, as using `Pin<&'a mut Waiter>` as the `Handle` 574 // type would require `Semaphore` to be generic over a lifetime. We can't 575 // use `Pin<*mut Waiter>`, as raw pointers are `Unpin` regardless of whether 576 // or not they dereference to an `!Unpin` target. 577 type Handle = NonNull<Waiter>; 578 type Target = Waiter; 579 as_raw(handle: &Self::Handle) -> NonNull<Waiter>580 fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> { 581 *handle 582 } 583 from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>584 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { 585 ptr 586 } 587 pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>588 unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { 589 NonNull::from(&mut target.as_mut().pointers) 590 } 591 } 592