1 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] 2 //! # Implementation Details. 3 //! 4 //! The semaphore is implemented using an intrusive linked list of waiters. An 5 //! atomic counter tracks the number of available permits. If the semaphore does 6 //! not contain the required number of permits, the task attempting to acquire 7 //! permits places its waker at the end of a queue. When new permits are made 8 //! available (such as by releasing an initial acquisition), they are assigned 9 //! to the task at the front of the queue, waking that task if its requested 10 //! number of permits is met. 11 //! 12 //! Because waiters are enqueued at the back of the linked list and dequeued 13 //! from the front, the semaphore is fair. Tasks trying to acquire large numbers 14 //! of permits at a time will always be woken eventually, even if many other 15 //! tasks are acquiring smaller numbers of permits. This means that in a 16 //! use-case like tokio's read-write lock, writers will not be starved by 17 //! readers. 18 use crate::loom::cell::UnsafeCell; 19 use crate::loom::sync::atomic::AtomicUsize; 20 use crate::loom::sync::{Mutex, MutexGuard}; 21 use crate::util::linked_list::{self, LinkedList}; 22 #[cfg(all(tokio_unstable, feature = "tracing"))] 23 use crate::util::trace; 24 use crate::util::WakeList; 25 26 use std::future::Future; 27 use std::marker::PhantomPinned; 28 use std::pin::Pin; 29 use std::ptr::NonNull; 30 use std::sync::atomic::Ordering::*; 31 use std::task::Poll::*; 32 use std::task::{Context, Poll, Waker}; 33 use std::{cmp, fmt}; 34 35 /// An asynchronous counting semaphore which permits waiting on multiple permits at once. 36 pub(crate) struct Semaphore { 37 waiters: Mutex<Waitlist>, 38 /// The current number of available permits in the semaphore. 39 permits: AtomicUsize, 40 #[cfg(all(tokio_unstable, feature = "tracing"))] 41 resource_span: tracing::Span, 42 } 43 44 struct Waitlist { 45 queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>, 46 closed: bool, 47 } 48 49 /// Error returned from the [`Semaphore::try_acquire`] function. 50 /// 51 /// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire 52 #[derive(Debug, PartialEq, Eq)] 53 pub enum TryAcquireError { 54 /// The semaphore has been [closed] and cannot issue new permits. 55 /// 56 /// [closed]: crate::sync::Semaphore::close 57 Closed, 58 59 /// The semaphore has no available permits. 60 NoPermits, 61 } 62 /// Error returned from the [`Semaphore::acquire`] function. 63 /// 64 /// An `acquire` operation can only fail if the semaphore has been 65 /// [closed]. 66 /// 67 /// [closed]: crate::sync::Semaphore::close 68 /// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire 69 #[derive(Debug)] 70 pub struct AcquireError(()); 71 72 pub(crate) struct Acquire<'a> { 73 node: Waiter, 74 semaphore: &'a Semaphore, 75 num_permits: u32, 76 queued: bool, 77 } 78 79 /// An entry in the wait queue. 80 struct Waiter { 81 /// The current state of the waiter. 82 /// 83 /// This is either the number of remaining permits required by 84 /// the waiter, or a flag indicating that the waiter is not yet queued. 85 state: AtomicUsize, 86 87 /// The waker to notify the task awaiting permits. 88 /// 89 /// # Safety 90 /// 91 /// This may only be accessed while the wait queue is locked. 92 waker: UnsafeCell<Option<Waker>>, 93 94 /// Intrusive linked-list pointers. 95 /// 96 /// # Safety 97 /// 98 /// This may only be accessed while the wait queue is locked. 99 /// 100 /// TODO: Ideally, we would be able to use loom to enforce that 101 /// this isn't accessed concurrently. However, it is difficult to 102 /// use a `UnsafeCell` here, since the `Link` trait requires _returning_ 103 /// references to `Pointers`, and `UnsafeCell` requires that checked access 104 /// take place inside a closure. We should consider changing `Pointers` to 105 /// use `UnsafeCell` internally. 106 pointers: linked_list::Pointers<Waiter>, 107 108 #[cfg(all(tokio_unstable, feature = "tracing"))] 109 ctx: trace::AsyncOpTracingCtx, 110 111 /// Should not be `Unpin`. 112 _p: PhantomPinned, 113 } 114 115 generate_addr_of_methods! { 116 impl<> Waiter { 117 unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> { 118 &self.pointers 119 } 120 } 121 } 122 123 impl Semaphore { 124 /// The maximum number of permits which a semaphore can hold. 125 /// 126 /// Note that this reserves three bits of flags in the permit counter, but 127 /// we only actually use one of them. However, the previous semaphore 128 /// implementation used three bits, so we will continue to reserve them to 129 /// avoid a breaking change if additional flags need to be added in the 130 /// future. 131 pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3; 132 const CLOSED: usize = 1; 133 // The least-significant bit in the number of permits is reserved to use 134 // as a flag indicating that the semaphore has been closed. Consequently 135 // PERMIT_SHIFT is used to leave that bit for that purpose. 136 const PERMIT_SHIFT: usize = 1; 137 138 /// Creates a new semaphore with the initial number of permits 139 /// 140 /// Maximum number of permits on 32-bit platforms is `1<<29`. new(permits: usize) -> Self141 pub(crate) fn new(permits: usize) -> Self { 142 assert!( 143 permits <= Self::MAX_PERMITS, 144 "a semaphore may not have more than MAX_PERMITS permits ({})", 145 Self::MAX_PERMITS 146 ); 147 148 #[cfg(all(tokio_unstable, feature = "tracing"))] 149 let resource_span = { 150 let resource_span = tracing::trace_span!( 151 "runtime.resource", 152 concrete_type = "Semaphore", 153 kind = "Sync", 154 is_internal = true 155 ); 156 157 resource_span.in_scope(|| { 158 tracing::trace!( 159 target: "runtime::resource::state_update", 160 permits = permits, 161 permits.op = "override", 162 ) 163 }); 164 resource_span 165 }; 166 167 Self { 168 permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), 169 waiters: Mutex::new(Waitlist { 170 queue: LinkedList::new(), 171 closed: false, 172 }), 173 #[cfg(all(tokio_unstable, feature = "tracing"))] 174 resource_span, 175 } 176 } 177 178 /// Creates a new semaphore with the initial number of permits. 179 /// 180 /// Maximum number of permits on 32-bit platforms is `1<<29`. 181 #[cfg(not(all(loom, test)))] const_new(permits: usize) -> Self182 pub(crate) const fn const_new(permits: usize) -> Self { 183 assert!(permits <= Self::MAX_PERMITS); 184 185 Self { 186 permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT), 187 waiters: Mutex::const_new(Waitlist { 188 queue: LinkedList::new(), 189 closed: false, 190 }), 191 #[cfg(all(tokio_unstable, feature = "tracing"))] 192 resource_span: tracing::Span::none(), 193 } 194 } 195 196 /// Creates a new closed semaphore with 0 permits. new_closed() -> Self197 pub(crate) fn new_closed() -> Self { 198 Self { 199 permits: AtomicUsize::new(Self::CLOSED), 200 waiters: Mutex::new(Waitlist { 201 queue: LinkedList::new(), 202 closed: true, 203 }), 204 #[cfg(all(tokio_unstable, feature = "tracing"))] 205 resource_span: tracing::Span::none(), 206 } 207 } 208 209 /// Returns the current number of available permits. available_permits(&self) -> usize210 pub(crate) fn available_permits(&self) -> usize { 211 self.permits.load(Acquire) >> Self::PERMIT_SHIFT 212 } 213 214 /// Adds `added` new permits to the semaphore. 215 /// 216 /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded. release(&self, added: usize)217 pub(crate) fn release(&self, added: usize) { 218 if added == 0 { 219 return; 220 } 221 222 // Assign permits to the wait queue 223 self.add_permits_locked(added, self.waiters.lock()); 224 } 225 226 /// Closes the semaphore. This prevents the semaphore from issuing new 227 /// permits and notifies all pending waiters. close(&self)228 pub(crate) fn close(&self) { 229 let mut waiters = self.waiters.lock(); 230 // If the semaphore's permits counter has enough permits for an 231 // unqueued waiter to acquire all the permits it needs immediately, 232 // it won't touch the wait list. Therefore, we have to set a bit on 233 // the permit counter as well. However, we must do this while 234 // holding the lock --- otherwise, if we set the bit and then wait 235 // to acquire the lock we'll enter an inconsistent state where the 236 // permit counter is closed, but the wait list is not. 237 self.permits.fetch_or(Self::CLOSED, Release); 238 waiters.closed = true; 239 while let Some(mut waiter) = waiters.queue.pop_back() { 240 let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }; 241 if let Some(waker) = waker { 242 waker.wake(); 243 } 244 } 245 } 246 247 /// Returns true if the semaphore is closed. is_closed(&self) -> bool248 pub(crate) fn is_closed(&self) -> bool { 249 self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED 250 } 251 try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError>252 pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> { 253 assert!( 254 num_permits as usize <= Self::MAX_PERMITS, 255 "a semaphore may not have more than MAX_PERMITS permits ({})", 256 Self::MAX_PERMITS 257 ); 258 let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT; 259 let mut curr = self.permits.load(Acquire); 260 loop { 261 // Has the semaphore closed? 262 if curr & Self::CLOSED == Self::CLOSED { 263 return Err(TryAcquireError::Closed); 264 } 265 266 // Are there enough permits remaining? 267 if curr < num_permits { 268 return Err(TryAcquireError::NoPermits); 269 } 270 271 let next = curr - num_permits; 272 273 match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { 274 Ok(_) => { 275 // TODO: Instrument once issue has been solved 276 return Ok(()); 277 } 278 Err(actual) => curr = actual, 279 } 280 } 281 } 282 acquire(&self, num_permits: u32) -> Acquire<'_>283 pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> { 284 Acquire::new(self, num_permits) 285 } 286 287 /// Release `rem` permits to the semaphore's wait list, starting from the 288 /// end of the queue. 289 /// 290 /// If `rem` exceeds the number of permits needed by the wait list, the 291 /// remainder are assigned back to the semaphore. add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>)292 fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) { 293 let mut wakers = WakeList::new(); 294 let mut lock = Some(waiters); 295 let mut is_empty = false; 296 while rem > 0 { 297 let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock()); 298 'inner: while wakers.can_push() { 299 // Was the waiter assigned enough permits to wake it? 300 match waiters.queue.last() { 301 Some(waiter) => { 302 if !waiter.assign_permits(&mut rem) { 303 break 'inner; 304 } 305 } 306 None => { 307 is_empty = true; 308 // If we assigned permits to all the waiters in the queue, and there are 309 // still permits left over, assign them back to the semaphore. 310 break 'inner; 311 } 312 }; 313 let mut waiter = waiters.queue.pop_back().unwrap(); 314 if let Some(waker) = 315 unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) } 316 { 317 wakers.push(waker); 318 } 319 } 320 321 if rem > 0 && is_empty { 322 let permits = rem; 323 assert!( 324 permits <= Self::MAX_PERMITS, 325 "cannot add more than MAX_PERMITS permits ({})", 326 Self::MAX_PERMITS 327 ); 328 let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release); 329 let prev = prev >> Self::PERMIT_SHIFT; 330 assert!( 331 prev + permits <= Self::MAX_PERMITS, 332 "number of added permits ({}) would overflow MAX_PERMITS ({})", 333 rem, 334 Self::MAX_PERMITS 335 ); 336 337 // add remaining permits back 338 #[cfg(all(tokio_unstable, feature = "tracing"))] 339 self.resource_span.in_scope(|| { 340 tracing::trace!( 341 target: "runtime::resource::state_update", 342 permits = rem, 343 permits.op = "add", 344 ) 345 }); 346 347 rem = 0; 348 } 349 350 drop(waiters); // release the lock 351 352 wakers.wake_all(); 353 } 354 355 assert_eq!(rem, 0); 356 } 357 poll_acquire( &self, cx: &mut Context<'_>, num_permits: u32, node: Pin<&mut Waiter>, queued: bool, ) -> Poll<Result<(), AcquireError>>358 fn poll_acquire( 359 &self, 360 cx: &mut Context<'_>, 361 num_permits: u32, 362 node: Pin<&mut Waiter>, 363 queued: bool, 364 ) -> Poll<Result<(), AcquireError>> { 365 let mut acquired = 0; 366 367 let needed = if queued { 368 node.state.load(Acquire) << Self::PERMIT_SHIFT 369 } else { 370 (num_permits as usize) << Self::PERMIT_SHIFT 371 }; 372 373 let mut lock = None; 374 // First, try to take the requested number of permits from the 375 // semaphore. 376 let mut curr = self.permits.load(Acquire); 377 let mut waiters = loop { 378 // Has the semaphore closed? 379 if curr & Self::CLOSED > 0 { 380 return Ready(Err(AcquireError::closed())); 381 } 382 383 let mut remaining = 0; 384 let total = curr 385 .checked_add(acquired) 386 .expect("number of permits must not overflow"); 387 let (next, acq) = if total >= needed { 388 let next = curr - (needed - acquired); 389 (next, needed >> Self::PERMIT_SHIFT) 390 } else { 391 remaining = (needed - acquired) - curr; 392 (0, curr >> Self::PERMIT_SHIFT) 393 }; 394 395 if remaining > 0 && lock.is_none() { 396 // No permits were immediately available, so this permit will 397 // (probably) need to wait. We'll need to acquire a lock on the 398 // wait queue before continuing. We need to do this _before_ the 399 // CAS that sets the new value of the semaphore's `permits` 400 // counter. Otherwise, if we subtract the permits and then 401 // acquire the lock, we might miss additional permits being 402 // added while waiting for the lock. 403 lock = Some(self.waiters.lock()); 404 } 405 406 match self.permits.compare_exchange(curr, next, AcqRel, Acquire) { 407 Ok(_) => { 408 acquired += acq; 409 if remaining == 0 { 410 if !queued { 411 #[cfg(all(tokio_unstable, feature = "tracing"))] 412 self.resource_span.in_scope(|| { 413 tracing::trace!( 414 target: "runtime::resource::state_update", 415 permits = acquired, 416 permits.op = "sub", 417 ); 418 tracing::trace!( 419 target: "runtime::resource::async_op::state_update", 420 permits_obtained = acquired, 421 permits.op = "add", 422 ) 423 }); 424 425 return Ready(Ok(())); 426 } else if lock.is_none() { 427 break self.waiters.lock(); 428 } 429 } 430 break lock.expect("lock must be acquired before waiting"); 431 } 432 Err(actual) => curr = actual, 433 } 434 }; 435 436 if waiters.closed { 437 return Ready(Err(AcquireError::closed())); 438 } 439 440 #[cfg(all(tokio_unstable, feature = "tracing"))] 441 self.resource_span.in_scope(|| { 442 tracing::trace!( 443 target: "runtime::resource::state_update", 444 permits = acquired, 445 permits.op = "sub", 446 ) 447 }); 448 449 if node.assign_permits(&mut acquired) { 450 self.add_permits_locked(acquired, waiters); 451 return Ready(Ok(())); 452 } 453 454 assert_eq!(acquired, 0); 455 let mut old_waker = None; 456 457 // Otherwise, register the waker & enqueue the node. 458 node.waker.with_mut(|waker| { 459 // Safety: the wait list is locked, so we may modify the waker. 460 let waker = unsafe { &mut *waker }; 461 // Do we need to register the new waker? 462 if waker 463 .as_ref() 464 .map(|waker| !waker.will_wake(cx.waker())) 465 .unwrap_or(true) 466 { 467 old_waker = std::mem::replace(waker, Some(cx.waker().clone())); 468 } 469 }); 470 471 // If the waiter is not already in the wait queue, enqueue it. 472 if !queued { 473 let node = unsafe { 474 let node = Pin::into_inner_unchecked(node) as *mut _; 475 NonNull::new_unchecked(node) 476 }; 477 478 waiters.queue.push_front(node); 479 } 480 drop(waiters); 481 drop(old_waker); 482 483 Pending 484 } 485 } 486 487 impl fmt::Debug for Semaphore { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result488 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 489 fmt.debug_struct("Semaphore") 490 .field("permits", &self.available_permits()) 491 .finish() 492 } 493 } 494 495 impl Waiter { new( num_permits: u32, #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx, ) -> Self496 fn new( 497 num_permits: u32, 498 #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx, 499 ) -> Self { 500 Waiter { 501 waker: UnsafeCell::new(None), 502 state: AtomicUsize::new(num_permits as usize), 503 pointers: linked_list::Pointers::new(), 504 #[cfg(all(tokio_unstable, feature = "tracing"))] 505 ctx, 506 _p: PhantomPinned, 507 } 508 } 509 510 /// Assign permits to the waiter. 511 /// 512 /// Returns `true` if the waiter should be removed from the queue assign_permits(&self, n: &mut usize) -> bool513 fn assign_permits(&self, n: &mut usize) -> bool { 514 let mut curr = self.state.load(Acquire); 515 loop { 516 let assign = cmp::min(curr, *n); 517 let next = curr - assign; 518 match self.state.compare_exchange(curr, next, AcqRel, Acquire) { 519 Ok(_) => { 520 *n -= assign; 521 #[cfg(all(tokio_unstable, feature = "tracing"))] 522 self.ctx.async_op_span.in_scope(|| { 523 tracing::trace!( 524 target: "runtime::resource::async_op::state_update", 525 permits_obtained = assign, 526 permits.op = "add", 527 ); 528 }); 529 return next == 0; 530 } 531 Err(actual) => curr = actual, 532 } 533 } 534 } 535 } 536 537 impl Future for Acquire<'_> { 538 type Output = Result<(), AcquireError>; 539 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>540 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 541 #[cfg(all(tokio_unstable, feature = "tracing"))] 542 let _resource_span = self.node.ctx.resource_span.clone().entered(); 543 #[cfg(all(tokio_unstable, feature = "tracing"))] 544 let _async_op_span = self.node.ctx.async_op_span.clone().entered(); 545 #[cfg(all(tokio_unstable, feature = "tracing"))] 546 let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered(); 547 548 let (node, semaphore, needed, queued) = self.project(); 549 550 // First, ensure the current task has enough budget to proceed. 551 #[cfg(all(tokio_unstable, feature = "tracing"))] 552 let coop = ready!(trace_poll_op!( 553 "poll_acquire", 554 crate::runtime::coop::poll_proceed(cx), 555 )); 556 557 #[cfg(not(all(tokio_unstable, feature = "tracing")))] 558 let coop = ready!(crate::runtime::coop::poll_proceed(cx)); 559 560 let result = match semaphore.poll_acquire(cx, needed, node, *queued) { 561 Pending => { 562 *queued = true; 563 Pending 564 } 565 Ready(r) => { 566 coop.made_progress(); 567 r?; 568 *queued = false; 569 Ready(Ok(())) 570 } 571 }; 572 573 #[cfg(all(tokio_unstable, feature = "tracing"))] 574 return trace_poll_op!("poll_acquire", result); 575 576 #[cfg(not(all(tokio_unstable, feature = "tracing")))] 577 return result; 578 } 579 } 580 581 impl<'a> Acquire<'a> { new(semaphore: &'a Semaphore, num_permits: u32) -> Self582 fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self { 583 #[cfg(any(not(tokio_unstable), not(feature = "tracing")))] 584 return Self { 585 node: Waiter::new(num_permits), 586 semaphore, 587 num_permits, 588 queued: false, 589 }; 590 591 #[cfg(all(tokio_unstable, feature = "tracing"))] 592 return semaphore.resource_span.in_scope(|| { 593 let async_op_span = 594 tracing::trace_span!("runtime.resource.async_op", source = "Acquire::new"); 595 let async_op_poll_span = async_op_span.in_scope(|| { 596 tracing::trace!( 597 target: "runtime::resource::async_op::state_update", 598 permits_requested = num_permits, 599 permits.op = "override", 600 ); 601 602 tracing::trace!( 603 target: "runtime::resource::async_op::state_update", 604 permits_obtained = 0usize, 605 permits.op = "override", 606 ); 607 608 tracing::trace_span!("runtime.resource.async_op.poll") 609 }); 610 611 let ctx = trace::AsyncOpTracingCtx { 612 async_op_span, 613 async_op_poll_span, 614 resource_span: semaphore.resource_span.clone(), 615 }; 616 617 Self { 618 node: Waiter::new(num_permits, ctx), 619 semaphore, 620 num_permits, 621 queued: false, 622 } 623 }); 624 } 625 project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool)626 fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) { 627 fn is_unpin<T: Unpin>() {} 628 unsafe { 629 // Safety: all fields other than `node` are `Unpin` 630 631 is_unpin::<&Semaphore>(); 632 is_unpin::<&mut bool>(); 633 is_unpin::<u32>(); 634 635 let this = self.get_unchecked_mut(); 636 ( 637 Pin::new_unchecked(&mut this.node), 638 this.semaphore, 639 this.num_permits, 640 &mut this.queued, 641 ) 642 } 643 } 644 } 645 646 impl Drop for Acquire<'_> { drop(&mut self)647 fn drop(&mut self) { 648 // If the future is completed, there is no node in the wait list, so we 649 // can skip acquiring the lock. 650 if !self.queued { 651 return; 652 } 653 654 // This is where we ensure safety. The future is being dropped, 655 // which means we must ensure that the waiter entry is no longer stored 656 // in the linked list. 657 let mut waiters = self.semaphore.waiters.lock(); 658 659 // remove the entry from the list 660 let node = NonNull::from(&mut self.node); 661 // Safety: we have locked the wait list. 662 unsafe { waiters.queue.remove(node) }; 663 664 let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire); 665 if acquired_permits > 0 { 666 self.semaphore.add_permits_locked(acquired_permits, waiters); 667 } 668 } 669 } 670 671 // Safety: the `Acquire` future is not `Sync` automatically because it contains 672 // a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the 673 // `UnsafeCell` is only accessed when the future is borrowed mutably (either in 674 // `poll` or in `drop`). Therefore, it is safe (although not particularly 675 // _useful_) for the future to be borrowed immutably across threads. 676 unsafe impl Sync for Acquire<'_> {} 677 678 // ===== impl AcquireError ==== 679 680 impl AcquireError { closed() -> AcquireError681 fn closed() -> AcquireError { 682 AcquireError(()) 683 } 684 } 685 686 impl fmt::Display for AcquireError { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result687 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 688 write!(fmt, "semaphore closed") 689 } 690 } 691 692 impl std::error::Error for AcquireError {} 693 694 // ===== impl TryAcquireError ===== 695 696 impl TryAcquireError { 697 /// Returns `true` if the error was caused by a closed semaphore. 698 #[allow(dead_code)] // may be used later! is_closed(&self) -> bool699 pub(crate) fn is_closed(&self) -> bool { 700 matches!(self, TryAcquireError::Closed) 701 } 702 703 /// Returns `true` if the error was caused by calling `try_acquire` on a 704 /// semaphore with no available permits. 705 #[allow(dead_code)] // may be used later! is_no_permits(&self) -> bool706 pub(crate) fn is_no_permits(&self) -> bool { 707 matches!(self, TryAcquireError::NoPermits) 708 } 709 } 710 711 impl fmt::Display for TryAcquireError { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result712 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 713 match self { 714 TryAcquireError::Closed => write!(fmt, "semaphore closed"), 715 TryAcquireError::NoPermits => write!(fmt, "no permits available"), 716 } 717 } 718 } 719 720 impl std::error::Error for TryAcquireError {} 721 722 /// # Safety 723 /// 724 /// `Waiter` is forced to be !Unpin. 725 unsafe impl linked_list::Link for Waiter { 726 type Handle = NonNull<Waiter>; 727 type Target = Waiter; 728 as_raw(handle: &Self::Handle) -> NonNull<Waiter>729 fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> { 730 *handle 731 } 732 from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>733 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> { 734 ptr 735 } 736 pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>737 unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> { 738 Waiter::addr_of_pointers(target) 739 } 740 } 741