1 //! Thread parking for Darwin-based systems. 2 //! 3 //! Darwin actually has futex syscalls (`__ulock_wait`/`__ulock_wake`), but they 4 //! cannot be used in `std` because they are non-public (their use will lead to 5 //! rejection from the App Store) and because they are only available starting 6 //! with macOS version 10.12, even though the minimum target version is 10.7. 7 //! 8 //! Therefore, we need to look for other synchronization primitives. Luckily, Darwin 9 //! supports semaphores, which allow us to implement the behaviour we need with 10 //! only one primitive (as opposed to a mutex-condvar pair). We use the semaphore 11 //! provided by libdispatch, as the underlying Mach semaphore is only dubiously 12 //! public. 13 14 use crate::pin::Pin; 15 use crate::sync::atomic::{ 16 AtomicI8, 17 Ordering::{Acquire, Release}, 18 }; 19 use crate::time::Duration; 20 21 type dispatch_semaphore_t = *mut crate::ffi::c_void; 22 type dispatch_time_t = u64; 23 24 const DISPATCH_TIME_NOW: dispatch_time_t = 0; 25 const DISPATCH_TIME_FOREVER: dispatch_time_t = !0; 26 27 // Contained in libSystem.dylib, which is linked by default. 28 extern "C" { dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t29 fn dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t; dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t30 fn dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t; dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize31 fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize; dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize32 fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize; dispatch_release(object: *mut crate::ffi::c_void)33 fn dispatch_release(object: *mut crate::ffi::c_void); 34 } 35 36 const EMPTY: i8 = 0; 37 const NOTIFIED: i8 = 1; 38 const PARKED: i8 = -1; 39 40 pub struct Parker { 41 semaphore: dispatch_semaphore_t, 42 state: AtomicI8, 43 } 44 45 unsafe impl Sync for Parker {} 46 unsafe impl Send for Parker {} 47 48 impl Parker { new_in_place(parker: *mut Parker)49 pub unsafe fn new_in_place(parker: *mut Parker) { 50 let semaphore = dispatch_semaphore_create(0); 51 assert!( 52 !semaphore.is_null(), 53 "failed to create dispatch semaphore for thread synchronization" 54 ); 55 parker.write(Parker { semaphore, state: AtomicI8::new(EMPTY) }) 56 } 57 58 // Does not need `Pin`, but other implementation do. park(self: Pin<&Self>)59 pub unsafe fn park(self: Pin<&Self>) { 60 // The semaphore counter must be zero at this point, because unparking 61 // threads will not actually increase it until we signalled that we 62 // are waiting. 63 64 // Change NOTIFIED to EMPTY and EMPTY to PARKED. 65 if self.state.fetch_sub(1, Acquire) == NOTIFIED { 66 return; 67 } 68 69 // Another thread may increase the semaphore counter from this point on. 70 // If it is faster than us, we will decrement it again immediately below. 71 // If we are faster, we wait. 72 73 // Ensure that the semaphore counter has actually been decremented, even 74 // if the call timed out for some reason. 75 while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {} 76 77 // At this point, the semaphore counter is zero again. 78 79 // We were definitely woken up, so we don't need to check the state. 80 // Still, we need to reset the state using a swap to observe the state 81 // change with acquire ordering. 82 self.state.swap(EMPTY, Acquire); 83 } 84 85 // Does not need `Pin`, but other implementation do. park_timeout(self: Pin<&Self>, dur: Duration)86 pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { 87 if self.state.fetch_sub(1, Acquire) == NOTIFIED { 88 return; 89 } 90 91 let nanos = dur.as_nanos().try_into().unwrap_or(i64::MAX); 92 let timeout = dispatch_time(DISPATCH_TIME_NOW, nanos); 93 94 let timeout = dispatch_semaphore_wait(self.semaphore, timeout) != 0; 95 96 let state = self.state.swap(EMPTY, Acquire); 97 if state == NOTIFIED && timeout { 98 // If the state was NOTIFIED but semaphore_wait returned without 99 // decrementing the count because of a timeout, it means another 100 // thread is about to call semaphore_signal. We must wait for that 101 // to happen to ensure the semaphore count is reset. 102 while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {} 103 } else { 104 // Either a timeout occurred and we reset the state before any thread 105 // tried to wake us up, or we were woken up and reset the state, 106 // making sure to observe the state change with acquire ordering. 107 // Either way, the semaphore counter is now zero again. 108 } 109 } 110 111 // Does not need `Pin`, but other implementation do. unpark(self: Pin<&Self>)112 pub fn unpark(self: Pin<&Self>) { 113 let state = self.state.swap(NOTIFIED, Release); 114 if state == PARKED { 115 unsafe { 116 dispatch_semaphore_signal(self.semaphore); 117 } 118 } 119 } 120 } 121 122 impl Drop for Parker { drop(&mut self)123 fn drop(&mut self) { 124 // SAFETY: 125 // We always ensure that the semaphore count is reset, so this will 126 // never cause an exception. 127 unsafe { 128 dispatch_release(self.semaphore); 129 } 130 } 131 } 132