1 // Copyright 2020 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::cell::UnsafeCell; 6 use std::future::Future; 7 use std::mem; 8 use std::pin::Pin; 9 use std::ptr::NonNull; 10 use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; 11 use std::sync::Arc; 12 use std::task::{Context, Poll, Waker}; 13 14 use intrusive_collections::linked_list::{LinkedList, LinkedListOps}; 15 use intrusive_collections::{intrusive_adapter, DefaultLinkOps, LinkOps}; 16 17 use crate::sync::SpinLock; 18 19 // An atomic version of a LinkedListLink. See https://github.com/Amanieu/intrusive-rs/issues/47 for 20 // more details. 21 #[repr(align(128))] 22 pub struct AtomicLink { 23 prev: UnsafeCell<Option<NonNull<AtomicLink>>>, 24 next: UnsafeCell<Option<NonNull<AtomicLink>>>, 25 linked: AtomicBool, 26 } 27 28 impl AtomicLink { new() -> AtomicLink29 fn new() -> AtomicLink { 30 AtomicLink { 31 linked: AtomicBool::new(false), 32 prev: UnsafeCell::new(None), 33 next: UnsafeCell::new(None), 34 } 35 } 36 is_linked(&self) -> bool37 fn is_linked(&self) -> bool { 38 self.linked.load(Ordering::Relaxed) 39 } 40 } 41 42 impl DefaultLinkOps for AtomicLink { 43 type Ops = AtomicLinkOps; 44 45 const NEW: Self::Ops = AtomicLinkOps; 46 } 47 48 // Safe because the only way to mutate `AtomicLink` is via the `LinkedListOps` trait whose methods 49 // are all unsafe and require that the caller has first called `acquire_link` (and had it return 50 // true) to use them safely. 51 unsafe impl Send for AtomicLink {} 52 unsafe impl Sync for AtomicLink {} 53 54 #[derive(Copy, Clone, Default)] 55 pub struct AtomicLinkOps; 56 57 unsafe impl LinkOps for AtomicLinkOps { 58 type LinkPtr = NonNull<AtomicLink>; 59 acquire_link(&mut self, ptr: Self::LinkPtr) -> bool60 unsafe fn acquire_link(&mut self, ptr: Self::LinkPtr) -> bool { 61 !ptr.as_ref().linked.swap(true, Ordering::Acquire) 62 } 63 release_link(&mut self, ptr: Self::LinkPtr)64 unsafe fn release_link(&mut self, ptr: Self::LinkPtr) { 65 ptr.as_ref().linked.store(false, Ordering::Release) 66 } 67 } 68 69 unsafe impl LinkedListOps for AtomicLinkOps { next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr>70 unsafe fn next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> { 71 *ptr.as_ref().next.get() 72 } 73 prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr>74 unsafe fn prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> { 75 *ptr.as_ref().prev.get() 76 } 77 set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>)78 unsafe fn set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>) { 79 *ptr.as_ref().next.get() = next; 80 } 81 set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>)82 unsafe fn set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>) { 83 *ptr.as_ref().prev.get() = prev; 84 } 85 } 86 87 #[derive(Clone, Copy)] 88 pub enum Kind { 89 Shared, 90 Exclusive, 91 } 92 93 enum State { 94 Init, 95 Waiting(Waker), 96 Woken, 97 Finished, 98 Processing, 99 } 100 101 // Indicates the queue to which the waiter belongs. It is the responsibility of the Mutex and 102 // Condvar implementations to update this value when adding/removing a Waiter from their respective 103 // waiter lists. 104 #[repr(u8)] 105 #[derive(Debug, Eq, PartialEq)] 106 pub enum WaitingFor { 107 // The waiter is either not linked into a waiter list or it is linked into a temporary list. 108 None = 0, 109 // The waiter is linked into the Mutex's waiter list. 110 Mutex = 1, 111 // The waiter is linked into the Condvar's waiter list. 112 Condvar = 2, 113 } 114 115 // Represents a thread currently blocked on a Condvar or on acquiring a Mutex. 116 pub struct Waiter { 117 link: AtomicLink, 118 state: SpinLock<State>, 119 cancel: fn(usize, &Waiter, bool), 120 cancel_data: usize, 121 kind: Kind, 122 waiting_for: AtomicU8, 123 } 124 125 impl Waiter { 126 // Create a new, initialized Waiter. 127 // 128 // `kind` should indicate whether this waiter represent a thread that is waiting for a shared 129 // lock or an exclusive lock. 130 // 131 // `cancel` is the function that is called when a `WaitFuture` (returned by the `wait()` 132 // function) is dropped before it can complete. `cancel_data` is used as the first parameter of 133 // the `cancel` function. The second parameter is the `Waiter` that was canceled and the third 134 // parameter indicates whether the `WaitFuture` was dropped after it was woken (but before it 135 // was polled to completion). A value of `false` for the third parameter may already be stale 136 // by the time the cancel function runs and so does not guarantee that the waiter was not woken. 137 // In this case, implementations should still check if the Waiter was woken. However, a value of 138 // `true` guarantees that the waiter was already woken up so no additional checks are necessary. 139 // In this case, the cancel implementation should wake up the next waiter in its wait list, if 140 // any. 141 // 142 // `waiting_for` indicates the waiter list to which this `Waiter` will be added. See the 143 // documentation of the `WaitingFor` enum for the meaning of the different values. new( kind: Kind, cancel: fn(usize, &Waiter, bool), cancel_data: usize, waiting_for: WaitingFor, ) -> Waiter144 pub fn new( 145 kind: Kind, 146 cancel: fn(usize, &Waiter, bool), 147 cancel_data: usize, 148 waiting_for: WaitingFor, 149 ) -> Waiter { 150 Waiter { 151 link: AtomicLink::new(), 152 state: SpinLock::new(State::Init), 153 cancel, 154 cancel_data, 155 kind, 156 waiting_for: AtomicU8::new(waiting_for as u8), 157 } 158 } 159 160 // The kind of lock that this `Waiter` is waiting to acquire. kind(&self) -> Kind161 pub fn kind(&self) -> Kind { 162 self.kind 163 } 164 165 // Returns true if this `Waiter` is currently linked into a waiter list. is_linked(&self) -> bool166 pub fn is_linked(&self) -> bool { 167 self.link.is_linked() 168 } 169 170 // Indicates the waiter list to which this `Waiter` belongs. is_waiting_for(&self) -> WaitingFor171 pub fn is_waiting_for(&self) -> WaitingFor { 172 match self.waiting_for.load(Ordering::Acquire) { 173 0 => WaitingFor::None, 174 1 => WaitingFor::Mutex, 175 2 => WaitingFor::Condvar, 176 v => panic!("Unknown value for `WaitingFor`: {}", v), 177 } 178 } 179 180 // Change the waiter list to which this `Waiter` belongs. This will panic if called when the 181 // `Waiter` is still linked into a waiter list. set_waiting_for(&self, waiting_for: WaitingFor)182 pub fn set_waiting_for(&self, waiting_for: WaitingFor) { 183 self.waiting_for.store(waiting_for as u8, Ordering::Release); 184 } 185 186 // Reset the Waiter back to its initial state. Panics if this `Waiter` is still linked into a 187 // waiter list. reset(&self, waiting_for: WaitingFor)188 pub fn reset(&self, waiting_for: WaitingFor) { 189 debug_assert!(!self.is_linked(), "Cannot reset `Waiter` while linked"); 190 self.set_waiting_for(waiting_for); 191 192 let mut state = self.state.lock(); 193 if let State::Waiting(waker) = mem::replace(&mut *state, State::Init) { 194 mem::drop(state); 195 mem::drop(waker); 196 } 197 } 198 199 // Wait until woken up by another thread. wait(&self) -> WaitFuture<'_>200 pub fn wait(&self) -> WaitFuture<'_> { 201 WaitFuture { waiter: self } 202 } 203 204 // Wake up the thread associated with this `Waiter`. Panics if `waiting_for()` does not return 205 // `WaitingFor::None` or if `is_linked()` returns true. wake(&self)206 pub fn wake(&self) { 207 debug_assert!(!self.is_linked(), "Cannot wake `Waiter` while linked"); 208 debug_assert_eq!(self.is_waiting_for(), WaitingFor::None); 209 210 let mut state = self.state.lock(); 211 212 if let State::Waiting(waker) = mem::replace(&mut *state, State::Woken) { 213 mem::drop(state); 214 waker.wake(); 215 } 216 } 217 } 218 219 pub struct WaitFuture<'w> { 220 waiter: &'w Waiter, 221 } 222 223 impl<'w> Future for WaitFuture<'w> { 224 type Output = (); 225 poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>226 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { 227 let mut state = self.waiter.state.lock(); 228 229 match mem::replace(&mut *state, State::Processing) { 230 State::Init => { 231 *state = State::Waiting(cx.waker().clone()); 232 233 Poll::Pending 234 } 235 State::Waiting(old_waker) => { 236 *state = State::Waiting(cx.waker().clone()); 237 mem::drop(state); 238 mem::drop(old_waker); 239 240 Poll::Pending 241 } 242 State::Woken => { 243 *state = State::Finished; 244 Poll::Ready(()) 245 } 246 State::Finished => { 247 panic!("Future polled after returning Poll::Ready"); 248 } 249 State::Processing => { 250 panic!("Unexpected waker state"); 251 } 252 } 253 } 254 } 255 256 impl<'w> Drop for WaitFuture<'w> { drop(&mut self)257 fn drop(&mut self) { 258 let state = self.waiter.state.lock(); 259 260 match *state { 261 State::Finished => {} 262 State::Processing => panic!("Unexpected waker state"), 263 State::Woken => { 264 mem::drop(state); 265 266 // We were woken but not polled. Wake up the next waiter. 267 (self.waiter.cancel)(self.waiter.cancel_data, self.waiter, true); 268 } 269 _ => { 270 mem::drop(state); 271 272 // Not woken. No need to wake up any waiters. 273 (self.waiter.cancel)(self.waiter.cancel_data, self.waiter, false); 274 } 275 } 276 } 277 } 278 279 intrusive_adapter!(pub WaiterAdapter = Arc<Waiter>: Waiter { link: AtomicLink }); 280 281 pub type WaiterList = LinkedList<WaiterAdapter>; 282