• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Thread parking for Darwin-based systems.
2 //!
3 //! Darwin actually has futex syscalls (`__ulock_wait`/`__ulock_wake`), but they
4 //! cannot be used in `std` because they are non-public (their use will lead to
5 //! rejection from the App Store) and because they are only available starting
6 //! with macOS version 10.12, even though the minimum target version is 10.7.
7 //!
8 //! Therefore, we need to look for other synchronization primitives. Luckily, Darwin
9 //! supports semaphores, which allow us to implement the behaviour we need with
10 //! only one primitive (as opposed to a mutex-condvar pair). We use the semaphore
11 //! provided by libdispatch, as the underlying Mach semaphore is only dubiously
12 //! public.
13 
14 use crate::pin::Pin;
15 use crate::sync::atomic::{
16     AtomicI8,
17     Ordering::{Acquire, Release},
18 };
19 use crate::time::Duration;
20 
21 type dispatch_semaphore_t = *mut crate::ffi::c_void;
22 type dispatch_time_t = u64;
23 
24 const DISPATCH_TIME_NOW: dispatch_time_t = 0;
25 const DISPATCH_TIME_FOREVER: dispatch_time_t = !0;
26 
27 // Contained in libSystem.dylib, which is linked by default.
28 extern "C" {
dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t29     fn dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t;
dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t30     fn dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t;
dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize31     fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize;
dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize32     fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize;
dispatch_release(object: *mut crate::ffi::c_void)33     fn dispatch_release(object: *mut crate::ffi::c_void);
34 }
35 
36 const EMPTY: i8 = 0;
37 const NOTIFIED: i8 = 1;
38 const PARKED: i8 = -1;
39 
40 pub struct Parker {
41     semaphore: dispatch_semaphore_t,
42     state: AtomicI8,
43 }
44 
45 unsafe impl Sync for Parker {}
46 unsafe impl Send for Parker {}
47 
48 impl Parker {
new_in_place(parker: *mut Parker)49     pub unsafe fn new_in_place(parker: *mut Parker) {
50         let semaphore = dispatch_semaphore_create(0);
51         assert!(
52             !semaphore.is_null(),
53             "failed to create dispatch semaphore for thread synchronization"
54         );
55         parker.write(Parker { semaphore, state: AtomicI8::new(EMPTY) })
56     }
57 
58     // Does not need `Pin`, but other implementation do.
park(self: Pin<&Self>)59     pub unsafe fn park(self: Pin<&Self>) {
60         // The semaphore counter must be zero at this point, because unparking
61         // threads will not actually increase it until we signalled that we
62         // are waiting.
63 
64         // Change NOTIFIED to EMPTY and EMPTY to PARKED.
65         if self.state.fetch_sub(1, Acquire) == NOTIFIED {
66             return;
67         }
68 
69         // Another thread may increase the semaphore counter from this point on.
70         // If it is faster than us, we will decrement it again immediately below.
71         // If we are faster, we wait.
72 
73         // Ensure that the semaphore counter has actually been decremented, even
74         // if the call timed out for some reason.
75         while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}
76 
77         // At this point, the semaphore counter is zero again.
78 
79         // We were definitely woken up, so we don't need to check the state.
80         // Still, we need to reset the state using a swap to observe the state
81         // change with acquire ordering.
82         self.state.swap(EMPTY, Acquire);
83     }
84 
85     // Does not need `Pin`, but other implementation do.
park_timeout(self: Pin<&Self>, dur: Duration)86     pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
87         if self.state.fetch_sub(1, Acquire) == NOTIFIED {
88             return;
89         }
90 
91         let nanos = dur.as_nanos().try_into().unwrap_or(i64::MAX);
92         let timeout = dispatch_time(DISPATCH_TIME_NOW, nanos);
93 
94         let timeout = dispatch_semaphore_wait(self.semaphore, timeout) != 0;
95 
96         let state = self.state.swap(EMPTY, Acquire);
97         if state == NOTIFIED && timeout {
98             // If the state was NOTIFIED but semaphore_wait returned without
99             // decrementing the count because of a timeout, it means another
100             // thread is about to call semaphore_signal. We must wait for that
101             // to happen to ensure the semaphore count is reset.
102             while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}
103         } else {
104             // Either a timeout occurred and we reset the state before any thread
105             // tried to wake us up, or we were woken up and reset the state,
106             // making sure to observe the state change with acquire ordering.
107             // Either way, the semaphore counter is now zero again.
108         }
109     }
110 
111     // Does not need `Pin`, but other implementation do.
unpark(self: Pin<&Self>)112     pub fn unpark(self: Pin<&Self>) {
113         let state = self.state.swap(NOTIFIED, Release);
114         if state == PARKED {
115             unsafe {
116                 dispatch_semaphore_signal(self.semaphore);
117             }
118         }
119     }
120 }
121 
122 impl Drop for Parker {
drop(&mut self)123     fn drop(&mut self) {
124         // SAFETY:
125         // We always ensure that the semaphore count is reset, so this will
126         // never cause an exception.
127         unsafe {
128             dispatch_release(self.semaphore);
129         }
130     }
131 }
132