• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]
2 //! # Implementation Details.
3 //!
4 //! The semaphore is implemented using an intrusive linked list of waiters. An
5 //! atomic counter tracks the number of available permits. If the semaphore does
6 //! not contain the required number of permits, the task attempting to acquire
7 //! permits places its waker at the end of a queue. When new permits are made
8 //! available (such as by releasing an initial acquisition), they are assigned
9 //! to the task at the front of the queue, waking that task if its requested
10 //! number of permits is met.
11 //!
12 //! Because waiters are enqueued at the back of the linked list and dequeued
13 //! from the front, the semaphore is fair. Tasks trying to acquire large numbers
14 //! of permits at a time will always be woken eventually, even if many other
15 //! tasks are acquiring smaller numbers of permits. This means that in a
16 //! use-case like tokio's read-write lock, writers will not be starved by
17 //! readers.
18 use crate::loom::cell::UnsafeCell;
19 use crate::loom::sync::atomic::AtomicUsize;
20 use crate::loom::sync::{Mutex, MutexGuard};
21 use crate::util::linked_list::{self, LinkedList};
22 use crate::util::WakeList;
23 
24 use std::future::Future;
25 use std::marker::PhantomPinned;
26 use std::pin::Pin;
27 use std::ptr::NonNull;
28 use std::sync::atomic::Ordering::*;
29 use std::task::Poll::*;
30 use std::task::{Context, Poll, Waker};
31 use std::{cmp, fmt};
32 
33 /// An asynchronous counting semaphore which permits waiting on multiple permits at once.
34 pub(crate) struct Semaphore {
35     waiters: Mutex<Waitlist>,
36     /// The current number of available permits in the semaphore.
37     permits: AtomicUsize,
38 }
39 
40 struct Waitlist {
41     queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
42     closed: bool,
43 }
44 
45 /// Error returned from the [`Semaphore::try_acquire`] function.
46 ///
47 /// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
48 #[derive(Debug, PartialEq)]
49 pub enum TryAcquireError {
50     /// The semaphore has been [closed] and cannot issue new permits.
51     ///
52     /// [closed]: crate::sync::Semaphore::close
53     Closed,
54 
55     /// The semaphore has no available permits.
56     NoPermits,
57 }
58 /// Error returned from the [`Semaphore::acquire`] function.
59 ///
60 /// An `acquire` operation can only fail if the semaphore has been
61 /// [closed].
62 ///
63 /// [closed]: crate::sync::Semaphore::close
64 /// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
65 #[derive(Debug)]
66 pub struct AcquireError(());
67 
68 pub(crate) struct Acquire<'a> {
69     node: Waiter,
70     semaphore: &'a Semaphore,
71     num_permits: u32,
72     queued: bool,
73 }
74 
75 /// An entry in the wait queue.
76 struct Waiter {
77     /// The current state of the waiter.
78     ///
79     /// This is either the number of remaining permits required by
80     /// the waiter, or a flag indicating that the waiter is not yet queued.
81     state: AtomicUsize,
82 
83     /// The waker to notify the task awaiting permits.
84     ///
85     /// # Safety
86     ///
87     /// This may only be accessed while the wait queue is locked.
88     waker: UnsafeCell<Option<Waker>>,
89 
90     /// Intrusive linked-list pointers.
91     ///
92     /// # Safety
93     ///
94     /// This may only be accessed while the wait queue is locked.
95     ///
96     /// TODO: Ideally, we would be able to use loom to enforce that
97     /// this isn't accessed concurrently. However, it is difficult to
98     /// use a `UnsafeCell` here, since the `Link` trait requires _returning_
99     /// references to `Pointers`, and `UnsafeCell` requires that checked access
100     /// take place inside a closure. We should consider changing `Pointers` to
101     /// use `UnsafeCell` internally.
102     pointers: linked_list::Pointers<Waiter>,
103 
104     /// Should not be `Unpin`.
105     _p: PhantomPinned,
106 }
107 
108 impl Semaphore {
109     /// The maximum number of permits which a semaphore can hold.
110     ///
111     /// Note that this reserves three bits of flags in the permit counter, but
112     /// we only actually use one of them. However, the previous semaphore
113     /// implementation used three bits, so we will continue to reserve them to
114     /// avoid a breaking change if additional flags need to be added in the
115     /// future.
116     pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3;
117     const CLOSED: usize = 1;
118     // The least-significant bit in the number of permits is reserved to use
119     // as a flag indicating that the semaphore has been closed. Consequently
120     // PERMIT_SHIFT is used to leave that bit for that purpose.
121     const PERMIT_SHIFT: usize = 1;
122 
123     /// Creates a new semaphore with the initial number of permits
124     ///
125     /// Maximum number of permits on 32-bit platforms is `1<<29`.
new(permits: usize) -> Self126     pub(crate) fn new(permits: usize) -> Self {
127         assert!(
128             permits <= Self::MAX_PERMITS,
129             "a semaphore may not have more than MAX_PERMITS permits ({})",
130             Self::MAX_PERMITS
131         );
132         Self {
133             permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
134             waiters: Mutex::new(Waitlist {
135                 queue: LinkedList::new(),
136                 closed: false,
137             }),
138         }
139     }
140 
141     /// Creates a new semaphore with the initial number of permits.
142     ///
143     /// Maximum number of permits on 32-bit platforms is `1<<29`.
144     ///
145     /// If the specified number of permits exceeds the maximum permit amount
146     /// Then the value will get clamped to the maximum number of permits.
147     #[cfg(all(feature = "parking_lot", not(all(loom, test))))]
const_new(mut permits: usize) -> Self148     pub(crate) const fn const_new(mut permits: usize) -> Self {
149         // NOTE: assertions and by extension panics are still being worked on: https://github.com/rust-lang/rust/issues/74925
150         // currently we just clamp the permit count when it exceeds the max
151         permits &= Self::MAX_PERMITS;
152 
153         Self {
154             permits: AtomicUsize::new(permits << Self::PERMIT_SHIFT),
155             waiters: Mutex::const_new(Waitlist {
156                 queue: LinkedList::new(),
157                 closed: false,
158             }),
159         }
160     }
161 
162     /// Returns the current number of available permits.
available_permits(&self) -> usize163     pub(crate) fn available_permits(&self) -> usize {
164         self.permits.load(Acquire) >> Self::PERMIT_SHIFT
165     }
166 
167     /// Adds `added` new permits to the semaphore.
168     ///
169     /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
release(&self, added: usize)170     pub(crate) fn release(&self, added: usize) {
171         if added == 0 {
172             return;
173         }
174 
175         // Assign permits to the wait queue
176         self.add_permits_locked(added, self.waiters.lock());
177     }
178 
179     /// Closes the semaphore. This prevents the semaphore from issuing new
180     /// permits and notifies all pending waiters.
close(&self)181     pub(crate) fn close(&self) {
182         let mut waiters = self.waiters.lock();
183         // If the semaphore's permits counter has enough permits for an
184         // unqueued waiter to acquire all the permits it needs immediately,
185         // it won't touch the wait list. Therefore, we have to set a bit on
186         // the permit counter as well. However, we must do this while
187         // holding the lock --- otherwise, if we set the bit and then wait
188         // to acquire the lock we'll enter an inconsistent state where the
189         // permit counter is closed, but the wait list is not.
190         self.permits.fetch_or(Self::CLOSED, Release);
191         waiters.closed = true;
192         while let Some(mut waiter) = waiters.queue.pop_back() {
193             let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
194             if let Some(waker) = waker {
195                 waker.wake();
196             }
197         }
198     }
199 
200     /// Returns true if the semaphore is closed.
is_closed(&self) -> bool201     pub(crate) fn is_closed(&self) -> bool {
202         self.permits.load(Acquire) & Self::CLOSED == Self::CLOSED
203     }
204 
try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError>205     pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
206         assert!(
207             num_permits as usize <= Self::MAX_PERMITS,
208             "a semaphore may not have more than MAX_PERMITS permits ({})",
209             Self::MAX_PERMITS
210         );
211         let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
212         let mut curr = self.permits.load(Acquire);
213         loop {
214             // Has the semaphore closed?
215             if curr & Self::CLOSED == Self::CLOSED {
216                 return Err(TryAcquireError::Closed);
217             }
218 
219             // Are there enough permits remaining?
220             if curr < num_permits {
221                 return Err(TryAcquireError::NoPermits);
222             }
223 
224             let next = curr - num_permits;
225 
226             match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
227                 Ok(_) => return Ok(()),
228                 Err(actual) => curr = actual,
229             }
230         }
231     }
232 
acquire(&self, num_permits: u32) -> Acquire<'_>233     pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> {
234         Acquire::new(self, num_permits)
235     }
236 
237     /// Release `rem` permits to the semaphore's wait list, starting from the
238     /// end of the queue.
239     ///
240     /// If `rem` exceeds the number of permits needed by the wait list, the
241     /// remainder are assigned back to the semaphore.
add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>)242     fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
243         let mut wakers = WakeList::new();
244         let mut lock = Some(waiters);
245         let mut is_empty = false;
246         while rem > 0 {
247             let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock());
248             'inner: while wakers.can_push() {
249                 // Was the waiter assigned enough permits to wake it?
250                 match waiters.queue.last() {
251                     Some(waiter) => {
252                         if !waiter.assign_permits(&mut rem) {
253                             break 'inner;
254                         }
255                     }
256                     None => {
257                         is_empty = true;
258                         // If we assigned permits to all the waiters in the queue, and there are
259                         // still permits left over, assign them back to the semaphore.
260                         break 'inner;
261                     }
262                 };
263                 let mut waiter = waiters.queue.pop_back().unwrap();
264                 if let Some(waker) =
265                     unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) }
266                 {
267                     wakers.push(waker);
268                 }
269             }
270 
271             if rem > 0 && is_empty {
272                 let permits = rem;
273                 assert!(
274                     permits <= Self::MAX_PERMITS,
275                     "cannot add more than MAX_PERMITS permits ({})",
276                     Self::MAX_PERMITS
277                 );
278                 let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
279                 let prev = prev >> Self::PERMIT_SHIFT;
280                 assert!(
281                     prev + permits <= Self::MAX_PERMITS,
282                     "number of added permits ({}) would overflow MAX_PERMITS ({})",
283                     rem,
284                     Self::MAX_PERMITS
285                 );
286                 rem = 0;
287             }
288 
289             drop(waiters); // release the lock
290 
291             wakers.wake_all();
292         }
293 
294         assert_eq!(rem, 0);
295     }
296 
poll_acquire( &self, cx: &mut Context<'_>, num_permits: u32, node: Pin<&mut Waiter>, queued: bool, ) -> Poll<Result<(), AcquireError>>297     fn poll_acquire(
298         &self,
299         cx: &mut Context<'_>,
300         num_permits: u32,
301         node: Pin<&mut Waiter>,
302         queued: bool,
303     ) -> Poll<Result<(), AcquireError>> {
304         let mut acquired = 0;
305 
306         let needed = if queued {
307             node.state.load(Acquire) << Self::PERMIT_SHIFT
308         } else {
309             (num_permits as usize) << Self::PERMIT_SHIFT
310         };
311 
312         let mut lock = None;
313         // First, try to take the requested number of permits from the
314         // semaphore.
315         let mut curr = self.permits.load(Acquire);
316         let mut waiters = loop {
317             // Has the semaphore closed?
318             if curr & Self::CLOSED > 0 {
319                 return Ready(Err(AcquireError::closed()));
320             }
321 
322             let mut remaining = 0;
323             let total = curr
324                 .checked_add(acquired)
325                 .expect("number of permits must not overflow");
326             let (next, acq) = if total >= needed {
327                 let next = curr - (needed - acquired);
328                 (next, needed >> Self::PERMIT_SHIFT)
329             } else {
330                 remaining = (needed - acquired) - curr;
331                 (0, curr >> Self::PERMIT_SHIFT)
332             };
333 
334             if remaining > 0 && lock.is_none() {
335                 // No permits were immediately available, so this permit will
336                 // (probably) need to wait. We'll need to acquire a lock on the
337                 // wait queue before continuing. We need to do this _before_ the
338                 // CAS that sets the new value of the semaphore's `permits`
339                 // counter. Otherwise, if we subtract the permits and then
340                 // acquire the lock, we might miss additional permits being
341                 // added while waiting for the lock.
342                 lock = Some(self.waiters.lock());
343             }
344 
345             match self.permits.compare_exchange(curr, next, AcqRel, Acquire) {
346                 Ok(_) => {
347                     acquired += acq;
348                     if remaining == 0 {
349                         if !queued {
350                             return Ready(Ok(()));
351                         } else if lock.is_none() {
352                             break self.waiters.lock();
353                         }
354                     }
355                     break lock.expect("lock must be acquired before waiting");
356                 }
357                 Err(actual) => curr = actual,
358             }
359         };
360 
361         if waiters.closed {
362             return Ready(Err(AcquireError::closed()));
363         }
364 
365         if node.assign_permits(&mut acquired) {
366             self.add_permits_locked(acquired, waiters);
367             return Ready(Ok(()));
368         }
369 
370         assert_eq!(acquired, 0);
371 
372         // Otherwise, register the waker & enqueue the node.
373         node.waker.with_mut(|waker| {
374             // Safety: the wait list is locked, so we may modify the waker.
375             let waker = unsafe { &mut *waker };
376             // Do we need to register the new waker?
377             if waker
378                 .as_ref()
379                 .map(|waker| !waker.will_wake(cx.waker()))
380                 .unwrap_or(true)
381             {
382                 *waker = Some(cx.waker().clone());
383             }
384         });
385 
386         // If the waiter is not already in the wait queue, enqueue it.
387         if !queued {
388             let node = unsafe {
389                 let node = Pin::into_inner_unchecked(node) as *mut _;
390                 NonNull::new_unchecked(node)
391             };
392 
393             waiters.queue.push_front(node);
394         }
395 
396         Pending
397     }
398 }
399 
400 impl fmt::Debug for Semaphore {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result401     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
402         fmt.debug_struct("Semaphore")
403             .field("permits", &self.available_permits())
404             .finish()
405     }
406 }
407 
408 impl Waiter {
new(num_permits: u32) -> Self409     fn new(num_permits: u32) -> Self {
410         Waiter {
411             waker: UnsafeCell::new(None),
412             state: AtomicUsize::new(num_permits as usize),
413             pointers: linked_list::Pointers::new(),
414             _p: PhantomPinned,
415         }
416     }
417 
418     /// Assign permits to the waiter.
419     ///
420     /// Returns `true` if the waiter should be removed from the queue
assign_permits(&self, n: &mut usize) -> bool421     fn assign_permits(&self, n: &mut usize) -> bool {
422         let mut curr = self.state.load(Acquire);
423         loop {
424             let assign = cmp::min(curr, *n);
425             let next = curr - assign;
426             match self.state.compare_exchange(curr, next, AcqRel, Acquire) {
427                 Ok(_) => {
428                     *n -= assign;
429                     return next == 0;
430                 }
431                 Err(actual) => curr = actual,
432             }
433         }
434     }
435 }
436 
437 impl Future for Acquire<'_> {
438     type Output = Result<(), AcquireError>;
439 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>440     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
441         // First, ensure the current task has enough budget to proceed.
442         let coop = ready!(crate::coop::poll_proceed(cx));
443 
444         let (node, semaphore, needed, queued) = self.project();
445 
446         match semaphore.poll_acquire(cx, needed, node, *queued) {
447             Pending => {
448                 *queued = true;
449                 Pending
450             }
451             Ready(r) => {
452                 coop.made_progress();
453                 r?;
454                 *queued = false;
455                 Ready(Ok(()))
456             }
457         }
458     }
459 }
460 
461 impl<'a> Acquire<'a> {
new(semaphore: &'a Semaphore, num_permits: u32) -> Self462     fn new(semaphore: &'a Semaphore, num_permits: u32) -> Self {
463         Self {
464             node: Waiter::new(num_permits),
465             semaphore,
466             num_permits,
467             queued: false,
468         }
469     }
470 
project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool)471     fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Semaphore, u32, &mut bool) {
472         fn is_unpin<T: Unpin>() {}
473         unsafe {
474             // Safety: all fields other than `node` are `Unpin`
475 
476             is_unpin::<&Semaphore>();
477             is_unpin::<&mut bool>();
478             is_unpin::<u32>();
479 
480             let this = self.get_unchecked_mut();
481             (
482                 Pin::new_unchecked(&mut this.node),
483                 this.semaphore,
484                 this.num_permits,
485                 &mut this.queued,
486             )
487         }
488     }
489 }
490 
491 impl Drop for Acquire<'_> {
drop(&mut self)492     fn drop(&mut self) {
493         // If the future is completed, there is no node in the wait list, so we
494         // can skip acquiring the lock.
495         if !self.queued {
496             return;
497         }
498 
499         // This is where we ensure safety. The future is being dropped,
500         // which means we must ensure that the waiter entry is no longer stored
501         // in the linked list.
502         let mut waiters = self.semaphore.waiters.lock();
503 
504         // remove the entry from the list
505         let node = NonNull::from(&mut self.node);
506         // Safety: we have locked the wait list.
507         unsafe { waiters.queue.remove(node) };
508 
509         let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire);
510         if acquired_permits > 0 {
511             self.semaphore.add_permits_locked(acquired_permits, waiters);
512         }
513     }
514 }
515 
516 // Safety: the `Acquire` future is not `Sync` automatically because it contains
517 // a `Waiter`, which, in turn, contains an `UnsafeCell`. However, the
518 // `UnsafeCell` is only accessed when the future is borrowed mutably (either in
519 // `poll` or in `drop`). Therefore, it is safe (although not particularly
520 // _useful_) for the future to be borrowed immutably across threads.
521 unsafe impl Sync for Acquire<'_> {}
522 
523 // ===== impl AcquireError ====
524 
525 impl AcquireError {
closed() -> AcquireError526     fn closed() -> AcquireError {
527         AcquireError(())
528     }
529 }
530 
531 impl fmt::Display for AcquireError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result532     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
533         write!(fmt, "semaphore closed")
534     }
535 }
536 
537 impl std::error::Error for AcquireError {}
538 
539 // ===== impl TryAcquireError =====
540 
541 impl TryAcquireError {
542     /// Returns `true` if the error was caused by a closed semaphore.
543     #[allow(dead_code)] // may be used later!
is_closed(&self) -> bool544     pub(crate) fn is_closed(&self) -> bool {
545         matches!(self, TryAcquireError::Closed)
546     }
547 
548     /// Returns `true` if the error was caused by calling `try_acquire` on a
549     /// semaphore with no available permits.
550     #[allow(dead_code)] // may be used later!
is_no_permits(&self) -> bool551     pub(crate) fn is_no_permits(&self) -> bool {
552         matches!(self, TryAcquireError::NoPermits)
553     }
554 }
555 
556 impl fmt::Display for TryAcquireError {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result557     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
558         match self {
559             TryAcquireError::Closed => write!(fmt, "semaphore closed"),
560             TryAcquireError::NoPermits => write!(fmt, "no permits available"),
561         }
562     }
563 }
564 
565 impl std::error::Error for TryAcquireError {}
566 
567 /// # Safety
568 ///
569 /// `Waiter` is forced to be !Unpin.
570 unsafe impl linked_list::Link for Waiter {
571     // XXX: ideally, we would be able to use `Pin` here, to enforce the
572     // invariant that list entries may not move while in the list. However, we
573     // can't do this currently, as using `Pin<&'a mut Waiter>` as the `Handle`
574     // type would require `Semaphore` to be generic over a lifetime. We can't
575     // use `Pin<*mut Waiter>`, as raw pointers are `Unpin` regardless of whether
576     // or not they dereference to an `!Unpin` target.
577     type Handle = NonNull<Waiter>;
578     type Target = Waiter;
579 
as_raw(handle: &Self::Handle) -> NonNull<Waiter>580     fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> {
581         *handle
582     }
583 
from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter>584     unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
585         ptr
586     }
587 
pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>>588     unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
589         NonNull::from(&mut target.as_mut().pointers)
590     }
591 }
592