• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
2 //! # Implementation Details.
3 //!
4 //! The semaphore is implemented using an intrusive linked list of waiters. An
5 //! atomic counter tracks the number of available permits. If the semaphore does
6 //! not contain the required number of permits, the task attempting to acquire
7 //! permits places its waker at the end of a queue. When new permits are made
8 //! available (such as by releasing an initial acquisition), they are assigned
9 //! to the task at the front of the queue, waking that task if its requested
10 //! number of permits is met.
11 //!
12 //! Because waiters are enqueued at the back of the linked list and dequeued
13 //! from the front, the semaphore is fair. Tasks trying to acquire large numbers
14 //! of permits at a time will always be woken eventually, even if many other
15 //! tasks are acquiring smaller numbers of permits. This means that in a
16 //! use-case like tokio's read-write lock, writers will not be starved by
17 //! readers.
18 use crate::loom::cell::UnsafeCell;
19 use crate::loom::sync::atomic::AtomicUsize;
20 use crate::loom::sync::{Mutex, MutexGuard};
21 use crate::util::linked_list::{self, LinkedList};
22 #[cfg(all(tokio_unstable, feature = "tracing"))]
23 use crate::util::trace;
24 use crate::util::WakeList;
25 
26 use std::future::Future;
27 use std::marker::PhantomPinned;
28 use std::pin::Pin;
29 use std::ptr::NonNull;
30 use std::sync::atomic::Ordering::*;
31 use std::task::Poll::*;
32 use std::task::{Context, Poll, Waker};
33 use std::{cmp, fmt};
34 
35 /// An asynchronous counting semaphore which permits waiting on multiple permits at once.
36 pub(crate) struct Semaphore {
37     waiters: Mutex<Waitlist>,
38     /// The current number of available permits in the semaphore.
39     permits: AtomicUsize,
40     #[cfg(all(tokio_unstable, feature = "tracing"))]
41     resource_span: tracing::Span,
42 }
43 
44 struct Waitlist {
45     queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
46     closed: bool,
47 }
48 
49 /// Error returned from the [`Semaphore::try_acquire`] function.
50 ///
51 /// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
52 #[derive(Debug, PartialEq, Eq)]
53 pub enum TryAcquireError {
54     /// The semaphore has been [closed] and cannot issue new permits.
55     ///
56     /// [closed]: crate::sync::Semaphore::close
57     Closed,
58 
59     /// The semaphore has no available permits.
60     NoPermits,
61 }
62 /// Error returned from the [`Semaphore::acquire`] function.
63 ///
64 /// An `acquire` operation can only fail if the semaphore has been
65 /// [closed].
66 ///
67 /// [closed]: crate::sync::Semaphore::close
68 /// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
69 #[derive(Debug)]
70 pub struct AcquireError(());
71 
72 pub(crate) struct Acquire<'a> {
73     node: Waiter,
74     semaphore: &'a Semaphore,
75     num_permits: u32,
76     queued: bool,
77 }
78 
79 /// An entry in the wait queue.
80 struct Waiter {
81     /// The current state of the waiter.
82     ///
83     /// This is either the number of remaining permits required by
84     /// the waiter, or a flag indicating that the waiter is not yet queued.
85     state: AtomicUsize,
86 
87     /// The waker to notify the task awaiting permits.
88     ///
89     /// # Safety
90     ///
91     /// This may only be accessed while the wait queue is locked.
92     waker: UnsafeCell<Option<Waker>>,
93 
94     /// Intrusive linked-list pointers.
95     ///
96     /// # Safety
97     ///
98     /// This may only be accessed while the wait queue is locked.
99     ///
100     /// TODO: Ideally, we would be able to use loom to enforce that
101     /// this isn't accessed concurrently. However, it is difficult to
102     /// use a `UnsafeCell` here, since the `Link` trait requires _returning_
103     /// references to `Pointers`, and `UnsafeCell` requires that checked access
104     /// take place inside a closure. We should consider changing `Pointers` to
105     /// use `UnsafeCell` internally.
106     pointers: linked_list::Pointers<Waiter>,
107 
108     #[cfg(all(tokio_unstable, feature = "tracing"))]
109     ctx: trace::AsyncOpTracingCtx,
110 
111     /// Should not be `Unpin`.
112     _p: PhantomPinned,
113 }
114 
115 generate_addr_of_methods! {
116     impl<> Waiter {
117         unsafe fn addr_of_pointers(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Waiter>> {
118             &self.pointers
119         }
120     }
121 }
122 
123 impl Semaphore {
124     /// The maximum number of permits which a semaphore can hold.
125     ///
126     /// Note that this reserves three bits of flags in the permit counter, but
127     /// we only actually use one of them. However, the previous semaphore
128     /// implementation used three bits, so we will continue to reserve them to
129     /// avoid a breaking change if additional flags need to be added in the
130     /// future.
131     pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3;
132     const CLOSED: usize = 1;
133     // The least-significant bit in the number of permits is reserved to use
134     // as a flag indicating that the semaphore has been closed. Consequently
135     // PERMIT_SHIFT is used to leave that bit for that purpose.
136     const PERMIT_SHIFT: usize = 1;
137 
138     /// Creates a new semaphore with the initial number of permits
139     ///
140     /// Maximum number of permits on 32-bit platforms is `1<<29`.
new(permits: usize) -> Self141     pub(crate) fn new(permits: usize) -> Self {
142         assert!(
143             permits <= Self::MAX_PERMITS,
144             "a semaphore may not have more than MAX_PERMITS permits ({})",
145             Self::MAX_PERMITS
146         );
147 
148         #[cfg(all(tokio_unstable, feature = "tracing"))]
149         let resource_span = {
150             let resource_span = tracing::trace_span!(
151                 "runtime.resource",
152                 concrete_type = "Semaphore",
153                 kind = "Sync",
154                 is_internal = true
155             );
156 
157             resource_span.in_scope(|| {
158                 tracing::trace!(
159                     target: "runtime::resource::state_update",
160                     permits = permits,
161                     permits.op = "override",
162                 )
163             });
164             resource_span
165         };
166 
167         Self {
168             permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
169             waiters: Mutex::new(Waitlist {
170                 queue: LinkedList::new(),
171                 closed: false,
172             }),
173             #[cfg(all(tokio_unstable, feature = "tracing"))]
174             resource_span,
175         }
176     }
177 
178     /// Creates a new semaphore with the initial number of permits.
179     ///
180     /// Maximum number of permits on 32-bit platforms is `1<<29`.
181     #[cfg(not(all(loom, test)))]
const_new(permits: usize) -> Self182     pub(crate) const fn const_new(permits: usize) -> Self {
183         assert!(permits <= Self::MAX_PERMITS);
184 
185         Self {
186             permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
187             waiters: Mutex::const_new(Waitlist {
188                 queue: LinkedList::new(),
189                 closed: false,
190             }),
191             #[cfg(all(tokio_unstable, feature = "tracing"))]
192             resource_span: tracing::Span::none(),
193         }
194     }
195 
196     /// Creates a new closed semaphore with 0 permits.
new_closed() -> Self197     pub(crate) fn new_closed() -> Self {
198         Self {
199             permits: AtomicUsize::new(Self::CLOSED),
200             waiters: Mutex::new(Waitlist {
201                 queue: LinkedList::new(),
202                 closed: true,
203             }),
204             #[cfg(all(tokio_unstable, feature = "tracing"))]
205             resource_span: tracing::Span::none(),
206         }
207     }
208 
209     /// Returns the current number of available permits.
available_permits(&self) -> usize210     pub(crate) fn available_permits(&self) -> usize {
211         self.permits.load(Acquire) >> Self::PERMIT_SHIFT
212     }
213 
214     /// Adds `added` new permits to the semaphore.
215     ///
216     /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
release(&self, added: usize)217     pub(crate) fn release(&self, added: usize) {
218         if added == 0 {
219             return;
220         }
221 
222         // Assign permits to the wait queue
223         self.add_permits_locked(added, self.waiters.lock());
224     }
225 
226     /// Closes the semaphore. This prevents the semaphore from issuing new
227     /// permits and notifies all pending waiters.
close(&self)228     pub(crate) fn close(&self) {
229         let mut waiters = self.waiters.lock();
230         // If the semaphore's permits counter has enough permits for an
231         // unqueued waiter to acquire all the permits it needs immediately,
232         // it won't touch the wait list. Therefore, we have to set a bit on
233         // the permit counter as well. However, we must do this while
234         // holding the lock --- otherwise, if we set the bit and then wait
235         // to acquire the lock we'll enter an inconsistent state where the
236         // permit counter is closed, but the wait list is not.
237         self.permits.fetch_or(Self::CLOSED, Release);
238         waiters.closed = true;
239         while let Some(mut waiter) = waiters.queue.pop_back() {
240             let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
241             if let Some(waker) = waker {
242                 waker.wake();
243             }
244         }
245     }
246 
247     /// Returns true if the semaphore is closed.
is_closed(&self) -> bool248     pub(crate) fn is_closed(&self) -> bool {
249         self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
250     }
251 
try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError>252     pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
253         assert!(
254             num_permits as usize <= Self::MAX_PERMITS,
255             "a semaphore may not have more than MAX_PERMITS permits ({})",
256             Self::MAX_PERMITS
257         );
258         let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
259         let mut curr = self.permits.load(Acquire);
260         loop {
261             // Has the semaphore closed?
262             if curr & Self::CLOSED == Self::CLOSED {
263                 return Err(TryAcquireError::Closed);
264             }
265 
266             // Are there enough permits remaining?
267             if curr < num_permits {
268                 return Err(TryAcquireError::NoPermits);
269             }
270 
271             let next = curr - num_permits;
272 
273             match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
274                 Ok(_) => {
275                     // TODO: Instrument once issue has been solved
276                     return Ok(());
277                 }
278                 Err(actual) => curr = actual,
279             }
280         }
281     }
282 
acquire(&self, num_permits: u32) -> Acquire<'_>283     pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> {
284         Acquire::new(self, num_permits)
285     }
286 
287     /// Release `rem` permits to the semaphore's wait list, starting from the
288     /// end of the queue.
289     ///
290     /// If `rem` exceeds the number of permits needed by the wait list, the
291     /// remainder are assigned back to the semaphore.
add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>)292     fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
293         let mut wakers = WakeList::new();
294         let mut lock = Some(waiters);
295         let mut is_empty = false;
296         while rem > 0 {
297             let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
298             'inner: while wakers.can_push() {
299                 // Was the waiter assigned enough permits to wake it?
300                 match waiters.queue.last() {
301                     Some(waiter) => {
302                         if !waiter.assign_permits(&mut rem) {
303                             break 'inner;
304                         }
305                     }
306                     None => {
307                         is_empty = true;
308                         // If we assigned permits to all the waiters in the queue, and there are
309                         // still permits left over, assign them back to the semaphore.
310                         break 'inner;
311                     }
312                 };
313                 let mut waiter = waiters.queue.pop_back().unwrap();
314                 if let Some(waker) =
315                     unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
316                 {
317                     wakers.push(waker);
318                 }
319             }
320 
321             if rem > 0 && is_empty {
322                 let permits = rem;
323                 assert!(
324                     permits <= Self::MAX_PERMITS,
325                     "cannot add more than MAX_PERMITS permits ({})",
326                     Self::MAX_PERMITS
327                 );
328                 let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
329                 let prev = prev >> Self::PERMIT_SHIFT;
330                 assert!(
331                     prev + permits <= Self::MAX_PERMITS,
332                     "number of added permits ({}) would overflow MAX_PERMITS ({})",
333                     rem,
334                     Self::MAX_PERMITS
335                 );
336 
337                 // add remaining permits back
338                 #[cfg(all(tokio_unstable, feature = "tracing"))]
339                 self.resource_span.in_scope(|| {
340                     tracing::trace!(
341                     target: "runtime::resource::state_update",
342                     permits = rem,
343                     permits.op = "add",
344                     )
345                 });
346 
347                 rem = 0;
348             }
349 
350             drop(waiters); // release the lock
351 
352             wakers.wake_all();
353         }
354 
355         assert_eq!(rem, 0);
356     }
357 
poll_acquire( &self, cx: &mut Context<'_>, num_permits: u32, node: Pin<&mut Waiter>, queued: bool, ) -> Poll<Result<(), AcquireError>>358     fn poll_acquire(
359         &self,
360         cx: &mut Context<'_>,
361         num_permits: u32,
362         node: Pin<&mut Waiter>,
363         queued: bool,
364     ) -> Poll<Result<(), AcquireError>> {
365         let mut acquired = 0;
366 
367         let needed = if queued {
368             node.state.load(Acquire) << Self::PERMIT_SHIFT
369         } else {
370             (num_permits as usize) << Self::PERMIT_SHIFT
371         };
372 
373         let mut lock = None;
374         // First, try to take the requested number of permits from the
375         // semaphore.
376         let mut curr = self.permits.load(Acquire);
377         let mut waiters = loop {
378             // Has the semaphore closed?
379             if curr & Self::CLOSED > 0 {
380                 return Ready(Err(AcquireError::closed()));
381             }
382 
383             let mut remaining = 0;
384             let total = curr
385                 .checked_add(acquired)
386                 .expect("number of permits must not overflow");
387             let (next, acq) = if total >= needed {
388                 let next = curr - (needed - acquired);
389                 (next, needed >> Self::PERMIT_SHIFT)
390             } else {
391                 remaining = (needed - acquired) - curr;
392                 (0, curr >> Self::PERMIT_SHIFT)
393             };
394 
395             if remaining > 0 && lock.is_none() {
396                 // No permits were immediately available, so this permit will
397                 // (probably) need to wait. We'll need to acquire a lock on the
398                 // wait queue before continuing. We need to do this _before_ the
399                 // CAS that sets the new value of the semaphore's `permits`
400                 // counter. Otherwise, if we subtract the permits and then
401                 // acquire the lock, we might miss additional permits being
402                 // added while waiting for the lock.
403                 lock = Some(self.waiters.lock());
404             }
405 
406             match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
407                 Ok(_) => {
408                     acquired += acq;
409                     if remaining == 0 {
410                         if !queued {
411                             #[cfg(all(tokio_unstable, feature = "tracing"))]
412                             self.resource_span.in_scope(|| {
413                                 tracing::trace!(
414                                     target: "runtime::resource::state_update",
415                                     permits = acquired,
416                                     permits.op = "sub",
417                                 );
418                                 tracing::trace!(
419                                     target: "runtime::resource::async_op::state_update",
420                                     permits_obtained = acquired,
421                                     permits.op = "add",
422                                 )
423                             });
424 
425                             return Ready(Ok(()));
426                         } else if lock.is_none() {
427                             break self.waiters.lock();
428                         }
429                     }
430                     break lock.expect("lock must be acquired before waiting");
431                 }
432                 Err(actual) => curr = actual,
433             }
434         };
435 
436         if waiters.closed {
437             return Ready(Err(AcquireError::closed()));
438         }
439 
440         #[cfg(all(tokio_unstable, feature = "tracing"))]
441         self.resource_span.in_scope(|| {
442             tracing::trace!(
443                 target: "runtime::resource::state_update",
444                 permits = acquired,
445                 permits.op = "sub",
446             )
447         });
448 
449         if node.assign_permits(&mut acquired) {
450             self.add_permits_locked(acquired, waiters);
451             return Ready(Ok(()));
452         }
453 
454         assert_eq!(acquired, 0);
455         let mut old_waker = None;
456 
457         // Otherwise, register the waker & enqueue the node.
458         node.waker.with_mut(|waker| {
459             // Safety: the wait list is locked, so we may modify the waker.
460             let waker = unsafe { &mut *waker };
461             // Do we need to register the new waker?
462             if waker
463                 .as_ref()
464                 .map(|waker| !waker.will_wake(cx.waker()))
465                 .unwrap_or(true)
466             {
467                 old_waker = std::mem::replace(waker, Some(cx.waker().clone()));
468             }
469         });
470 
471         // If the waiter is not already in the wait queue, enqueue it.
472         if !queued {
473             let node = unsafe {
474                 let node = Pin::into_inner_unchecked(node) as *mut _;
475                 NonNull::new_unchecked(node)
476             };
477 
478             waiters.queue.push_front(node);
479         }
480         drop(waiters);
481         drop(old_waker);
482 
483         Pending
484     }
485 }
486 
487 impl fmt::Debug for Semaphore {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result488     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
489         fmt.debug_struct("Semaphore")
490             .field("permits", &self.available_permits())
491             .finish()
492     }
493 }
494 
495 impl Waiter {
new( num_permits: u32, #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx, ) -> Self496     fn new(
497         num_permits: u32,
498         #[cfg(all(tokio_unstable, feature = "tracing"))] ctx: trace::AsyncOpTracingCtx,
499     ) -> Self {
500         Waiter {
501             waker: UnsafeCell::new(None),
502             state: AtomicUsize::new(num_permits as usize),
503             pointers: linked_list::Pointers::new(),
504             #[cfg(all(tokio_unstable, feature = "tracing"))]
505             ctx,
506             _p: PhantomPinned,
507         }
508     }
509 
510     /// Assign permits to the waiter.
511     ///
512     /// Returns `true` if the waiter should be removed from the queue
assign_permits(&self, n: &mut usize) -> bool513     fn assign_permits(&self, n: &mut usize) -> bool {
514         let mut curr = self.state.load(Acquire);
515         loop {
516             let assign = cmp::min(curr, *n);
517             let next = curr - assign;
518             match self.state.compare_exchange(curr, next, AcqRel, Acquire) {
519                 Ok(_) => {
520                     *n -= assign;
521                     #[cfg(all(tokio_unstable, feature = "tracing"))]
522                     self.ctx.async_op_span.in_scope(|| {
523                         tracing::trace!(
524                             target: "runtime::resource::async_op::state_update",
525                             permits_obtained = assign,
526                             permits.op = "add",
527                         );
528                     });
529                     return next == 0;
530                 }
531                 Err(actual) => curr = actual,
532             }
533         }
534     }
535 }
536 
537 impl Future for Acquire<'_> {
538     type Output = Result<(), AcquireError>;
539 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>540     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
541         #[cfg(all(tokio_unstable, feature = "tracing"))]
542         let _resource_span = self.node.ctx.resource_span.clone().entered();
543         #[cfg(all(tokio_unstable, feature = "tracing"))]
544         let _async_op_span = self.node.ctx.async_op_span.clone().entered();
545         #[cfg(all(tokio_unstable, feature = "tracing"))]
546         let _async_op_poll_span = self.node.ctx.async_op_poll_span.clone().entered();
547 
548         let (node, semaphore, needed, queued) = self.project();
549 
550         // First, ensure the current task has enough budget to proceed.
551         #[cfg(all(tokio_unstable, feature = "tracing"))]
552         let coop = ready!(trace_poll_op!(
553             "poll_acquire",
554             crate::runtime::coop::poll_proceed(cx),
555         ));
556 
557         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
558         let coop = ready!(crate::runtime::coop::poll_proceed(cx));
559 
560         let result = match semaphore.poll_acquire(cx, needed, node, *queued) {
561             Pending => {
562                 *queued = true;
563                 Pending
564             }
565             Ready(r) => {
566                 coop.made_progress();
567                 r?;
568                 *queued = false;
569                 Ready(Ok(()))
570             }
571         };
572 
573         #[cfg(all(tokio_unstable, feature = "tracing"))]
574         return trace_poll_op!("poll_acquire", result);
575 
576         #[cfg(not(all(tokio_unstable, feature = "tracing")))]
577         return result;
578     }
579 }
580 
581 impl<'a> Acquire<'a> {
new(semaphore: &'a Semaphore, num_permits: u32) -> Self582     fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self {
583         #[cfg(any(not(tokio_unstable), not(feature = "tracing")))]
584         return Self {
585             node: Waiter::new(num_permits),
586             semaphore,
587             num_permits,
588             queued: false,
589         };
590 
591         #[cfg(all(tokio_unstable, feature = "tracing"))]
592         return semaphore.resource_span.in_scope(|| {
593             let async_op_span =
594                 tracing::trace_span!("runtime.resource.async_op", source = "Acquire::new");
595             let async_op_poll_span = async_op_span.in_scope(|| {
596                 tracing::trace!(
597                     target: "runtime::resource::async_op::state_update",
598                     permits_requested = num_permits,
599                     permits.op = "override",
600                 );
601 
602                 tracing::trace!(
603                     target: "runtime::resource::async_op::state_update",
604                     permits_obtained = 0usize,
605                     permits.op = "override",
606                 );
607 
608                 tracing::trace_span!("runtime.resource.async_op.poll")
609             });
610 
611             let ctx = trace::AsyncOpTracingCtx {
612                 async_op_span,
613                 async_op_poll_span,
614                 resource_span: semaphore.resource_span.clone(),
615             };
616 
617             Self {
618                 node: Waiter::new(num_permits, ctx),
619                 semaphore,
620                 num_permits,
621                 queued: false,
622             }
623         });
624     }
625 
project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool)626     fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) {
627         fn is_unpin<T: Unpin>() {}
628         unsafe {
629             // Safety: all fields other than `node` are `Unpin`
630 
631             is_unpin::<&Semaphore>();
632             is_unpin::<&mut bool>();
633             is_unpin::<u32>();
634 
635             let this = self.get_unchecked_mut();
636             (
637                 Pin::new_unchecked(&mut this.node),
638                 this.semaphore,
639                 this.num_permits,
640                 &mut this.queued,
641             )
642         }
643     }
644 }
645 
646 impl Drop for Acquire<'_> {
drop(&mut self)647     fn drop(&mut self) {
648         // If the future is completed, there is no node in the wait list, so we
649         // can skip acquiring the lock.
650         if !self.queued {
651             return;
652         }
653 
654         // This is where we ensure safety. The future is being dropped,
655         // which means we must ensure that the waiter entry is no longer stored
656         // in the linked list.
657         let mut waiters = self.semaphore.waiters.lock();
658 
659         // remove the entry from the list
660         let node = NonNull::from(&mut self.node);
661         // Safety: we have locked the wait list.
662         unsafe { waiters.queue.remove(node) };
663 
664         let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire);
665         if acquired_permits > 0 {
666             self.semaphore.add_permits_locked(acquired_permits, waiters);
667         }
668     }
669 }
670 
671 // Safety: the `Acquire` future is not `Sync` automatically because it contains
672 // a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the
673 // `UnsafeCell` is only accessed when the future is borrowed mutably (either in
674 // `poll` or in `drop`). Therefore, it is safe (although not particularly
675 // _useful_) for the future to be borrowed immutably across threads.
676 unsafe impl Sync for Acquire<'_> {}
677 
678 // ===== impl AcquireError ====
679 
680 impl AcquireError {
closed() -> AcquireError681     fn closed() -> AcquireError {
682         AcquireError(())
683     }
684 }
685 
686 impl fmt::Display for AcquireError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result687     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
688         write!(fmt, "semaphore closed")
689     }
690 }
691 
692 impl std::error::Error for AcquireError {}
693 
694 // ===== impl TryAcquireError =====
695 
696 impl TryAcquireError {
697     /// Returns `true` if the error was caused by a closed semaphore.
698     #[allow(dead_code)] // may be used later!
is_closed(&self) -> bool699     pub(crate) fn is_closed(&self) -> bool {
700         matches!(self, TryAcquireError::Closed)
701     }
702 
703     /// Returns `true` if the error was caused by calling `try_acquire` on a
704     /// semaphore with no available permits.
705     #[allow(dead_code)] // may be used later!
is_no_permits(&self) -> bool706     pub(crate) fn is_no_permits(&self) -> bool {
707         matches!(self, TryAcquireError::NoPermits)
708     }
709 }
710 
711 impl fmt::Display for TryAcquireError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result712     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
713         match self {
714             TryAcquireError::Closed => write!(fmt, "semaphore closed"),
715             TryAcquireError::NoPermits => write!(fmt, "no permits available"),
716         }
717     }
718 }
719 
720 impl std::error::Error for TryAcquireError {}
721 
722 /// # Safety
723 ///
724 /// `Waiter` is forced to be !Unpin.
725 unsafe impl linked_list::Link for Waiter {
726     type Handle = NonNull<Waiter>;
727     type Target = Waiter;
728 
as_raw(handle: &Self::Handle) -> NonNull<Waiter>729     fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> {
730         *handle
731     }
732 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>733     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
734         ptr
735     }
736 
pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>737     unsafe fn pointers(target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
738         Waiter::addr_of_pointers(target)
739     }
740 }
741