• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
2 //! # Implementation Details.
3 //!
4 //! The semaphore is implemented using an intrusive linked list of waiters. An
5 //! atomic counter tracks the number of available permits. If the semaphore does
6 //! not contain the required number of permits, the task attempting to acquire
7 //! permits places its waker at the end of a queue. When new permits are made
8 //! available (such as by releasing an initial acquisition), they are assigned
9 //! to the task at the front of the queue, waking that task if its requested
10 //! number of permits is met.
11 //!
12 //! Because waiters are enqueued at the back of the linked list and dequeued
13 //! from the front, the semaphore is fair. Tasks trying to acquire large numbers
14 //! of permits at a time will always be woken eventually, even if many other
15 //! tasks are acquiring smaller numbers of permits. This means that in a
16 //! use-case like tokio's read-write lock, writers will not be starved by
17 //! readers.
18 use crate::loom::cell::UnsafeCell;
19 use crate::loom::sync::atomic::AtomicUsize;
20 use crate::loom::sync::{Mutex, MutexGuard};
21 use crate::util::linked_list::{self, LinkedList};
22 #[cfg(all(tokio_unstable, feature = "tracing"))]
23 use crate::util::trace;
24 use crate::util::WakeList;
25 
26 use std::future::Future;
27 use std::marker::PhantomPinned;
28 use std::pin::Pin;
29 use std::ptr::NonNull;
30 use std::sync::atomic::Ordering::*;
31 use std::task::Poll::*;
32 use std::task::{Context, Poll, Waker};
33 use std::{cmp, fmt};
34 
35 /// An asynchronous counting semaphore which permits waiting on multiple permits at once.
36 pub(crate) struct Semaphore {
37     waiters: Mutex<Waitlist>,
38     /// The current number of available permits in the semaphore.
39     permits: AtomicUsize,
40     #[cfg(all(tokio_unstable, feature = "tracing"))]
41     resource_span: tracing::Span,
42 }
43 
44 struct Waitlist {
45     queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
46     closed: bool,
47 }
48 
49 /// Error returned from the [`Semaphore::try_acquire`] function.
50 ///
51 /// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
52 #[derive(Debug, PartialEq, Eq)]
53 pub enum TryAcquireError {
54     /// The semaphore has been [closed] and cannot issue new permits.
55     ///
56     /// [closed]: crate::sync::Semaphore::close
57     Closed,
58 
59     /// The semaphore has no available permits.
60     NoPermits,
61 }
62 /// Error returned from the [`Semaphore::acquire`] function.
63 ///
64 /// An `acquire` operation can only fail if the semaphore has been
65 /// [closed].
66 ///
67 /// [closed]: crate::sync::Semaphore::close
68 /// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
69 #[derive(Debug)]
70 pub struct AcquireError(());
71 
72 pub(crate) struct Acquire<'a> {
73     node: Waiter,
74     semaphore: &'a Semaphore,
75     num_permits: u32,
76     queued: bool,
77 }
78 
79 /// An entry in the wait queue.
80 struct Waiter {
81     /// The current state of the waiter.
82     ///
83     /// This is either the number of remaining permits required by
84     /// the waiter, or a flag indicating that the waiter is not yet queued.
85     state: AtomicUsize,
86 
87     /// The waker to notify the task awaiting permits.
88     ///
89     /// # Safety
90     ///
91     /// This may only be accessed while the wait queue is locked.
92     waker: UnsafeCell<Option<Waker>>,
93 
94     /// Intrusive linked-list pointers.
95     ///
96     /// # Safety
97     ///
98     /// This may only be accessed while the wait queue is locked.
99     ///
100     /// TODO: Ideally, we would be able to use loom to enforce that
101     /// this isn't accessed concurrently. However, it is difficult to
102     /// use a `UnsafeCell` here, since the `Link` trait requires _returning_
103     /// references to `Pointers`, and `UnsafeCell` requires that checked access
104     /// take place inside a closure. We should consider changing `Pointers` to
105     /// use `UnsafeCell` internally.
106     pointers: linked_list::Pointers<Waiter>,
107 
108     #[cfg(all(tokio_unstable, feature = "tracing"))]
109     ctx: trace::AsyncOpTracingCtx,
110 
111     /// Should not be `Unpin`.
112     _p: PhantomPinned,
113 }
114 
115 generate_addr_of_methods! {
116     impl<> Waiter {
117         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
118             &self.pointers
119         }
120     }
121 }
122 
123 impl Semaphore {
124     /// The maximum number of permits which a semaphore can hold.
125     ///
126     /// Note that this reserves three bits of flags in the permit counter, but
127     /// we only actually use one of them. However, the previous semaphore
128     /// implementation used three bits, so we will continue to reserve them to
129     /// avoid a breaking change if additional flags need to be added in the
130     /// future.
131     pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3;
132     const CLOSED: usize = 1;
133     // The least-significant bit in the number of permits is reserved to use
134     // as a flag indicating that the semaphore has been closed. Consequently
135     // PERMIT_SHIFT is used to leave that bit for that purpose.
136     const PERMIT_SHIFT: usize = 1;
137 
138     /// Creates a new semaphore with the initial number of permits
139     ///
140     /// Maximum number of permits on 32-bit platforms is `1<<29`.
new(permits: usize) -> Self141     pub(crate) fn new(permits: usize) -> Self {
142         assert!(
143             permits <= Self::MAX_PERMITS,
144             "a semaphore may not have more than MAX_PERMITS permits ({})",
145             Self::MAX_PERMITS
146         );
147 
148         #[cfg(all(tokio_unstable, feature = "tracing"))]
149         let resource_span = {
150             let resource_span = tracing::trace_span!(
151                 "runtime.resource",
152                 concrete_type = "Semaphore",
153                 kind = "Sync",
154                 is_internal = true
155             );
156 
157             resource_span.in_scope(|| {
158                 tracing::trace!(
159                     target: "runtime::resource::state_update",
160                     permits = permits,
161                     permits.op = "override",
162                 )
163             });
164             resource_span
165         };
166 
167         Self {
168             permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
169             waiters: Mutex::new(Waitlist {
170                 queue: LinkedList::new(),
171                 closed: false,
172             }),
173             #[cfg(all(tokio_unstable, feature = "tracing"))]
174             resource_span,
175         }
176     }
177 
178     /// Creates a new semaphore with the initial number of permits.
179     ///
180     /// Maximum number of permits on 32-bit platforms is `1<<29`.
181     ///
182     /// If the specified number of permits exceeds the maximum permit amount
183     /// Then the value will get clamped to the maximum number of permits.
184     #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
const_new(mut permits: usize) -> Self185     pub(crate) const fn const_new(mut permits: usize) -> Self {
186         // NOTE: assertions and by extension panics are still being worked on: https://github.com/rust-lang/rust/issues/74925
187         // currently we just clamp the permit count when it exceeds the max
188         permits &= Self::MAX_PERMITS;
189 
190         Self {
191             permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
192             waiters: Mutex::const_new(Waitlist {
193                 queue: LinkedList::new(),
194                 closed: false,
195             }),
196             #[cfg(all(tokio_unstable, feature = "tracing"))]
197             resource_span: tracing::Span::none(),
198         }
199     }
200 
201     /// Returns the current number of available permits.
available_permits(&self) -> usize202     pub(crate) fn available_permits(&self) -> usize {
203         self.permits.load(Acquire) >> Self::PERMIT_SHIFT
204     }
205 
206     /// Adds `added` new permits to the semaphore.
207     ///
208     /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
release(&self, added: usize)209     pub(crate) fn release(&self, added: usize) {
210         if added == 0 {
211             return;
212         }
213 
214         // Assign permits to the wait queue
215         self.add_permits_locked(added, self.waiters.lock());
216     }
217 
218     /// Closes the semaphore. This prevents the semaphore from issuing new
219     /// permits and notifies all pending waiters.
close(&self)220     pub(crate) fn close(&self) {
221         let mut waiters = self.waiters.lock();
222         // If the semaphore's permits counter has enough permits for an
223         // unqueued waiter to acquire all the permits it needs immediately,
224         // it won't touch the wait list. Therefore, we have to set a bit on
225         // the permit counter as well. However, we must do this while
226         // holding the lock --- otherwise, if we set the bit and then wait
227         // to acquire the lock we'll enter an inconsistent state where the
228         // permit counter is closed, but the wait list is not.
229         self.permits.fetch_or(Self::CLOSED, Release);
230         waiters.closed = true;
231         while let Some(mut waiter) = waiters.queue.pop_back() {
232             let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
233             if let Some(waker) = waker {
234                 waker.wake();
235             }
236         }
237     }
238 
239     /// Returns true if the semaphore is closed.
is_closed(&self) -> bool240     pub(crate) fn is_closed(&self) -> bool {
241         self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
242     }
243 
try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError>244     pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
245         assert!(
246             num_permits as usize <= Self::MAX_PERMITS,
247             "a semaphore may not have more than MAX_PERMITS permits ({})",
248             Self::MAX_PERMITS
249         );
250         let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
251         let mut curr = self.permits.load(Acquire);
252         loop {
253             // Has the semaphore closed?
254             if curr & Self::CLOSED == Self::CLOSED {
255                 return Err(TryAcquireError::Closed);
256             }
257 
258             // Are there enough permits remaining?
259             if curr < num_permits {
260                 return Err(TryAcquireError::NoPermits);
261             }
262 
263             let next = curr - num_permits;
264 
265             match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
266                 Ok(_) => {
267                     // TODO: Instrument once issue has been solved}
268                     return Ok(());
269                 }
270                 Err(actual) => curr = actual,
271             }
272         }
273     }
274 
acquire(&self, num_permits: u32) -> Acquire<'_>275     pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> {
276         Acquire::new(self, num_permits)
277     }
278 
279     /// Release `rem` permits to the semaphore's wait list, starting from the
280     /// end of the queue.
281     ///
282     /// If `rem` exceeds the number of permits needed by the wait list, the
283     /// remainder are assigned back to the semaphore.
add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>)284     fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
285         let mut wakers = WakeList::new();
286         let mut lock = Some(waiters);
287         let mut is_empty = false;
288         while rem > 0 {
289             let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
290             'inner: while wakers.can_push() {
291                 // Was the waiter assigned enough permits to wake it?
292                 match waiters.queue.last() {
293                     Some(waiter) => {
294                         if !waiter.assign_permits(&mut rem) {
295                             break 'inner;
296                         }
297                     }
298                     None => {
299                         is_empty = true;
300                         // If we assigned permits to all the waiters in the queue, and there are
301                         // still permits left over, assign them back to the semaphore.
302                         break 'inner;
303                     }
304                 };
305                 let mut waiter = waiters.queue.pop_back().unwrap();
306                 if let Some(waker) =
307                     unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
308                 {
309                     wakers.push(waker);
310                 }
311             }
312 
313             if rem > 0 && is_empty {
314                 let permits = rem;
315                 assert!(
316                     permits <= Self::MAX_PERMITS,
317                     "cannot add more than MAX_PERMITS permits ({})",
318                     Self::MAX_PERMITS
319                 );
320                 let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
321                 let prev = prev >> Self::PERMIT_SHIFT;
322                 assert!(
323                     prev + permits <= Self::MAX_PERMITS,
324                     "number of added permits ({}) would overflow MAX_PERMITS ({})",
325                     rem,
326                     Self::MAX_PERMITS
327                 );
328 
329                 // add remaining permits back
330                 #[cfg(all(tokio_unstable, feature = "tracing"))]
331                 self.resource_span.in_scope(|| {
332                     tracing::trace!(
333                     target: "runtime::resource::state_update",
334                     permits = rem,
335                     permits.op = "add",
336                     )
337                 });
338 
339                 rem = 0;
340             }
341 
342             drop(waiters); // release the lock
343 
344             wakers.wake_all();
345         }
346 
347         assert_eq!(rem, 0);
348     }
349 
poll_acquire( &self, cx: &mut Context<'_>, num_permits: u32, node: Pin<&mut Waiter>, queued: bool, ) -> Poll<Result<(), AcquireError>>350     fn poll_acquire(
351         &self,
352         cx: &mut Context<'_>,
353         num_permits: u32,
354         node: Pin<&mut Waiter>,
355         queued: bool,
356     ) -> Poll<Result<(), AcquireError>> {
357         let mut acquired = 0;
358 
359         let needed = if queued {
360             node.state.load(Acquire) << Self::PERMIT_SHIFT
361         } else {
362             (num_permits as usize) << Self::PERMIT_SHIFT
363         };
364 
365         let mut lock = None;
366         // First, try to take the requested number of permits from the
367         // semaphore.
368         let mut curr = self.permits.load(Acquire);
369         let mut waiters = loop {
370             // Has the semaphore closed?
371             if curr & Self::CLOSED > 0 {
372                 return Ready(Err(AcquireError::closed()));
373             }
374 
375             let mut remaining = 0;
376             let total = curr
377                 .checked_add(acquired)
378                 .expect("number of permits must not overflow");
379             let (next, acq) = if total >= needed {
380                 let next = curr - (needed - acquired);
381                 (next, needed >> Self::PERMIT_SHIFT)
382             } else {
383                 remaining = (needed - acquired) - curr;
384                 (0, curr >> Self::PERMIT_SHIFT)
385             };
386 
387             if remaining > 0 && lock.is_none() {
388                 // No permits were immediately available, so this permit will
389                 // (probably) need to wait. We'll need to acquire a lock on the
390                 // wait queue before continuing. We need to do this _before_ the
391                 // CAS that sets the new value of the semaphore's `permits`
392                 // counter. Otherwise, if we subtract the permits and then
393                 // acquire the lock, we might miss additional permits being
394                 // added while waiting for the lock.
395                 lock = Some(self.waiters.lock());
396             }
397 
398             match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
399                 Ok(_) => {
400                     acquired += acq;
401                     if remaining == 0 {
402                         if !queued {
403                             #[cfg(all(tokio_unstable, feature = "tracing"))]
404                             self.resource_span.in_scope(|| {
405                                 tracing::trace!(
406                                     target: "runtime::resource::state_update",
407                                     permits = acquired,
408                                     permits.op = "sub",
409                                 );
410                                 tracing::trace!(
411                                     target: "runtime::resource::async_op::state_update",
412                                     permits_obtained = acquired,
413                                     permits.op = "add",
414                                 )
415                             });
416 
417                             return Ready(Ok(()));
418                         } else if lock.is_none() {
419                             break self.waiters.lock();
420                         }
421                     }
422                     break lock.expect("lock must be acquired before waiting");
423                 }
424                 Err(actual) => curr = actual,
425             }
426         };
427 
428         if waiters.closed {
429             return Ready(Err(AcquireError::closed()));
430         }
431 
432         #[cfg(all(tokio_unstable, feature = "tracing"))]
433         self.resource_span.in_scope(|| {
434             tracing::trace!(
435                 target: "runtime::resource::state_update",
436                 permits = acquired,
437                 permits.op = "sub",
438             )
439         });
440 
441         if node.assign_permits(&mut acquired) {
442             self.add_permits_locked(acquired, waiters);
443             return Ready(Ok(()));
444         }
445 
446         assert_eq!(acquired, 0);
447 
448         // Otherwise, register the waker & enqueue the node.
449         node.waker.with_mut(|waker| {
450             // Safety: the wait list is locked, so we may modify the waker.
451             let waker = unsafe { &mut *waker };
452             // Do we need to register the new waker?
453             if waker
454                 .as_ref()
455                 .map(|waker| !waker.will_wake(cx.waker()))
456                 .unwrap_or(true)
457             {
458                 *waker = Some(cx.waker().clone());
459             }
460         });
461 
462         // If the waiter is not already in the wait queue, enqueue it.
463         if !queued {
464             let node = unsafe {
465                 let node = Pin::into_inner_unchecked(node) as *mut _;
466                 NonNull::new_unchecked(node)
467             };
468 
469             waiters.queue.push_front(node);
470         }
471 
472         Pending
473     }
474 }
475 
476 impl fmt::Debug for Semaphore {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result477     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
478         fmt.debug_struct("Semaphore")
479             .field("permits", &self.available_permits())
480             .finish()
481     }
482 }
483 
484 impl Waiter {
new( num_permits: u32, #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx, ) -> Self485     fn new(
486         num_permits: u32,
487         #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx,
488     ) -> Self {
489         Waiter {
490             waker: UnsafeCell::new(None),
491             state: AtomicUsize::new(num_permits as usize),
492             pointers: linked_list::Pointers::new(),
493             #[cfg(all(tokio_unstable, feature = "tracing"))]
494             ctx,
495             _p: PhantomPinned,
496         }
497     }
498 
499     /// Assign permits to the waiter.
500     ///
501     /// Returns `true` if the waiter should be removed from the queue
assign_permits(&self, n: &mut usize) -> bool502     fn assign_permits(&self, n: &mut usize) -> bool {
503         let mut curr = self.state.load(Acquire);
504         loop {
505             let assign = cmp::min(curr, *n);
506             let next = curr - assign;
507             match self.state.compare_exchange(curr, next, AcqRel, Acquire) {
508                 Ok(_) => {
509                     *n -= assign;
510                     #[cfg(all(tokio_unstable, feature = "tracing"))]
511                     self.ctx.async_op_span.in_scope(|| {
512                         tracing::trace!(
513                             target: "runtime::resource::async_op::state_update",
514                             permits_obtained = assign,
515                             permits.op = "add",
516                         );
517                     });
518                     return next == 0;
519                 }
520                 Err(actual) => curr = actual,
521             }
522         }
523     }
524 }
525 
526 impl Future for Acquire<'_> {
527     type Output = Result<(), AcquireError>;
528 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>529     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
530         #[cfg(all(tokio_unstable, feature = "tracing"))]
531         let _resource_span = self.node.ctx.resource_span.clone().entered();
532         #[cfg(all(tokio_unstable, feature = "tracing"))]
533         let _async_op_span = self.node.ctx.async_op_span.clone().entered();
534         #[cfg(all(tokio_unstable, feature = "tracing"))]
535         let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered();
536 
537         let (node, semaphore, needed, queued) = self.project();
538 
539         // First, ensure the current task has enough budget to proceed.
540         #[cfg(all(tokio_unstable, feature = "tracing"))]
541         let coop = ready!(trace_poll_op!(
542             "poll_acquire",
543             crate::runtime::coop::poll_proceed(cx),
544         ));
545 
546         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
547         let coop = ready!(crate::runtime::coop::poll_proceed(cx));
548 
549         let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
550             Pending => {
551                 *queued = true;
552                 Pending
553             }
554             Ready(r) => {
555                 coop.made_progress();
556                 r?;
557                 *queued = false;
558                 Ready(Ok(()))
559             }
560         };
561 
562         #[cfg(all(tokio_unstable, feature = "tracing"))]
563         return trace_poll_op!("poll_acquire", result);
564 
565         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
566         return result;
567     }
568 }
569 
570 impl<'a> Acquire<'a> {
new(semaphore: &'a Semaphore, num_permits: u32) -> Self571     fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self {
572         #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
573         return Self {
574             node: Waiter::new(num_permits),
575             semaphore,
576             num_permits,
577             queued: false,
578         };
579 
580         #[cfg(all(tokio_unstable, feature = "tracing"))]
581         return semaphore.resource_span.in_scope(|| {
582             let async_op_span =
583                 tracing::trace_span!("runtime.resource.async_op", source = "Acquire::new");
584             let async_op_poll_span = async_op_span.in_scope(|| {
585                 tracing::trace!(
586                     target: "runtime::resource::async_op::state_update",
587                     permits_requested = num_permits,
588                     permits.op = "override",
589                 );
590 
591                 tracing::trace!(
592                     target: "runtime::resource::async_op::state_update",
593                     permits_obtained = 0usize,
594                     permits.op = "override",
595                 );
596 
597                 tracing::trace_span!("runtime.resource.async_op.poll")
598             });
599 
600             let ctx = trace::AsyncOpTracingCtx {
601                 async_op_span,
602                 async_op_poll_span,
603                 resource_span: semaphore.resource_span.clone(),
604             };
605 
606             Self {
607                 node: Waiter::new(num_permits, ctx),
608                 semaphore,
609                 num_permits,
610                 queued: false,
611             }
612         });
613     }
614 
project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool)615     fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) {
616         fn is_unpin<T: Unpin>() {}
617         unsafe {
618             // Safety: all fields other than `node` are `Unpin`
619 
620             is_unpin::<&Semaphore>();
621             is_unpin::<&mut bool>();
622             is_unpin::<u32>();
623 
624             let this = self.get_unchecked_mut();
625             (
626                 Pin::new_unchecked(&mut this.node),
627                 this.semaphore,
628                 this.num_permits,
629                 &mut this.queued,
630             )
631         }
632     }
633 }
634 
635 impl Drop for Acquire<'_> {
drop(&mut self)636     fn drop(&mut self) {
637         // If the future is completed, there is no node in the wait list, so we
638         // can skip acquiring the lock.
639         if !self.queued {
640             return;
641         }
642 
643         // This is where we ensure safety. The future is being dropped,
644         // which means we must ensure that the waiter entry is no longer stored
645         // in the linked list.
646         let mut waiters = self.semaphore.waiters.lock();
647 
648         // remove the entry from the list
649         let node = NonNull::from(&mut self.node);
650         // Safety: we have locked the wait list.
651         unsafe { waiters.queue.remove(node) };
652 
653         let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire);
654         if acquired_permits > 0 {
655             self.semaphore.add_permits_locked(acquired_permits, waiters);
656         }
657     }
658 }
659 
660 // Safety: the `Acquire` future is not `Sync` automatically because it contains
661 // a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the
662 // `UnsafeCell` is only accessed when the future is borrowed mutably (either in
663 // `poll` or in `drop`). Therefore, it is safe (although not particularly
664 // _useful_) for the future to be borrowed immutably across threads.
665 unsafe impl Sync for Acquire<'_> {}
666 
667 // ===== impl AcquireError ====
668 
669 impl AcquireError {
closed() -> AcquireError670     fn closed() -> AcquireError {
671         AcquireError(())
672     }
673 }
674 
675 impl fmt::Display for AcquireError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result676     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
677         write!(fmt, "semaphore closed")
678     }
679 }
680 
681 impl std::error::Error for AcquireError {}
682 
683 // ===== impl TryAcquireError =====
684 
685 impl TryAcquireError {
686     /// Returns `true` if the error was caused by a closed semaphore.
687     #[allow(dead_code)] // may be used later!
is_closed(&self) -> bool688     pub(crate) fn is_closed(&self) -> bool {
689         matches!(self, TryAcquireError::Closed)
690     }
691 
692     /// Returns `true` if the error was caused by calling `try_acquire` on a
693     /// semaphore with no available permits.
694     #[allow(dead_code)] // may be used later!
is_no_permits(&self) -> bool695     pub(crate) fn is_no_permits(&self) -> bool {
696         matches!(self, TryAcquireError::NoPermits)
697     }
698 }
699 
700 impl fmt::Display for TryAcquireError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result701     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
702         match self {
703             TryAcquireError::Closed => write!(fmt, "semaphore closed"),
704             TryAcquireError::NoPermits => write!(fmt, "no permits available"),
705         }
706     }
707 }
708 
709 impl std::error::Error for TryAcquireError {}
710 
711 /// # Safety
712 ///
713 /// `Waiter` is forced to be !Unpin.
714 unsafe impl linked_list::Link for Waiter {
715     type Handle = NonNull<Waiter>;
716     type Target = Waiter;
717 
as_raw(handle: &Self::Handle) -> NonNull<Waiter>718     fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> {
719         *handle
720     }
721 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>722     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
723         ptr
724     }
725 
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>726     unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
727         Waiter::addr_of_pointers(target)
728     }
729 }
730