1 // Copyright 2020 The Chromium OS Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 use std::cell::UnsafeCell; 6 use std::future::Future; 7 use std::mem; 8 use std::pin::Pin; 9 use std::ptr::NonNull; 10 use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; 11 use std::sync::Arc; 12 use std::task::{Context, Poll, Waker}; 13 14 use intrusive_collections::linked_list::{LinkedList, LinkedListOps}; 15 use intrusive_collections::{intrusive_adapter, DefaultLinkOps, LinkOps}; 16 17 use crate::sync::SpinLock; 18 19 // An atomic version of a LinkedListLink. See https://github.com/Amanieu/intrusive-rs/issues/47 for 20 // more details. 21 pub struct AtomicLink { 22 prev: UnsafeCell<Option<NonNull<AtomicLink>>>, 23 next: UnsafeCell<Option<NonNull<AtomicLink>>>, 24 linked: AtomicBool, 25 } 26 27 impl AtomicLink { new() -> AtomicLink28 fn new() -> AtomicLink { 29 AtomicLink { 30 linked: AtomicBool::new(false), 31 prev: UnsafeCell::new(None), 32 next: UnsafeCell::new(None), 33 } 34 } 35 is_linked(&self) -> bool36 fn is_linked(&self) -> bool { 37 self.linked.load(Ordering::Relaxed) 38 } 39 } 40 41 impl DefaultLinkOps for AtomicLink { 42 type Ops = AtomicLinkOps; 43 44 const NEW: Self::Ops = AtomicLinkOps; 45 } 46 47 // Safe because the only way to mutate `AtomicLink` is via the `LinkedListOps` trait whose methods 48 // are all unsafe and require that the caller has first called `acquire_link` (and had it return 49 // true) to use them safely. 50 unsafe impl Send for AtomicLink {} 51 unsafe impl Sync for AtomicLink {} 52 53 #[derive(Copy, Clone, Default)] 54 pub struct AtomicLinkOps; 55 56 unsafe impl LinkOps for AtomicLinkOps { 57 type LinkPtr = NonNull<AtomicLink>; 58 acquire_link(&mut self, ptr: Self::LinkPtr) -> bool59 unsafe fn acquire_link(&mut self, ptr: Self::LinkPtr) -> bool { 60 !ptr.as_ref().linked.swap(true, Ordering::Acquire) 61 } 62 release_link(&mut self, ptr: Self::LinkPtr)63 unsafe fn release_link(&mut self, ptr: Self::LinkPtr) { 64 ptr.as_ref().linked.store(false, Ordering::Release) 65 } 66 } 67 68 unsafe impl LinkedListOps for AtomicLinkOps { next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr>69 unsafe fn next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> { 70 *ptr.as_ref().next.get() 71 } 72 prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr>73 unsafe fn prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> { 74 *ptr.as_ref().prev.get() 75 } 76 set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>)77 unsafe fn set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>) { 78 *ptr.as_ref().next.get() = next; 79 } 80 set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>)81 unsafe fn set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>) { 82 *ptr.as_ref().prev.get() = prev; 83 } 84 } 85 86 #[derive(Clone, Copy)] 87 pub enum Kind { 88 Shared, 89 Exclusive, 90 } 91 92 enum State { 93 Init, 94 Waiting(Waker), 95 Woken, 96 Finished, 97 Processing, 98 } 99 100 // Indicates the queue to which the waiter belongs. It is the responsibility of the Mutex and 101 // Condvar implementations to update this value when adding/removing a Waiter from their respective 102 // waiter lists. 103 #[repr(u8)] 104 #[derive(Debug, Eq, PartialEq)] 105 pub enum WaitingFor { 106 // The waiter is either not linked into a waiter list or it is linked into a temporary list. 107 None = 0, 108 // The waiter is linked into the Mutex's waiter list. 109 Mutex = 1, 110 // The waiter is linked into the Condvar's waiter list. 111 Condvar = 2, 112 } 113 114 // Internal struct used to keep track of the cancellation function. 115 struct Cancel { 116 c: fn(usize, &Waiter, bool) -> bool, 117 data: usize, 118 } 119 120 // Represents a thread currently blocked on a Condvar or on acquiring a Mutex. 121 pub struct Waiter { 122 link: AtomicLink, 123 state: SpinLock<State>, 124 cancel: SpinLock<Cancel>, 125 kind: Kind, 126 waiting_for: AtomicU8, 127 } 128 129 impl Waiter { 130 // Create a new, initialized Waiter. 131 // 132 // `kind` should indicate whether this waiter represent a thread that is waiting for a shared 133 // lock or an exclusive lock. 134 // 135 // `cancel` is the function that is called when a `WaitFuture` (returned by the `wait()` 136 // function) is dropped before it can complete. `cancel_data` is used as the first parameter of 137 // the `cancel` function. The second parameter is the `Waiter` that was canceled and the third 138 // parameter indicates whether the `WaitFuture` was dropped after it was woken (but before it 139 // was polled to completion). The `cancel` function should return true if it was able to 140 // successfully process the cancellation. One reason why a `cancel` function may return false is 141 // if the `Waiter` was transferred to a different waiter list after the cancel function was 142 // called but before it was able to run. In this case, it is expected that the new waiter list 143 // updated the cancel function (by calling `set_cancel`) and the cancellation will be retried by 144 // fetching and calling the new cancellation function. 145 // 146 // `waiting_for` indicates the waiter list to which this `Waiter` will be added. See the 147 // documentation of the `WaitingFor` enum for the meaning of the different values. new( kind: Kind, cancel: fn(usize, &Waiter, bool) -> bool, cancel_data: usize, waiting_for: WaitingFor, ) -> Waiter148 pub fn new( 149 kind: Kind, 150 cancel: fn(usize, &Waiter, bool) -> bool, 151 cancel_data: usize, 152 waiting_for: WaitingFor, 153 ) -> Waiter { 154 Waiter { 155 link: AtomicLink::new(), 156 state: SpinLock::new(State::Init), 157 cancel: SpinLock::new(Cancel { 158 c: cancel, 159 data: cancel_data, 160 }), 161 kind, 162 waiting_for: AtomicU8::new(waiting_for as u8), 163 } 164 } 165 166 // The kind of lock that this `Waiter` is waiting to acquire. kind(&self) -> Kind167 pub fn kind(&self) -> Kind { 168 self.kind 169 } 170 171 // Returns true if this `Waiter` is currently linked into a waiter list. is_linked(&self) -> bool172 pub fn is_linked(&self) -> bool { 173 self.link.is_linked() 174 } 175 176 // Indicates the waiter list to which this `Waiter` belongs. is_waiting_for(&self) -> WaitingFor177 pub fn is_waiting_for(&self) -> WaitingFor { 178 match self.waiting_for.load(Ordering::Acquire) { 179 0 => WaitingFor::None, 180 1 => WaitingFor::Mutex, 181 2 => WaitingFor::Condvar, 182 v => panic!("Unknown value for `WaitingFor`: {}", v), 183 } 184 } 185 186 // Change the waiter list to which this `Waiter` belongs. This will panic if called when the 187 // `Waiter` is still linked into a waiter list. set_waiting_for(&self, waiting_for: WaitingFor)188 pub fn set_waiting_for(&self, waiting_for: WaitingFor) { 189 self.waiting_for.store(waiting_for as u8, Ordering::Release); 190 } 191 192 // Change the cancellation function that this `Waiter` should use. This will panic if called 193 // when the `Waiter` is still linked into a waiter list. set_cancel(&self, c: fn(usize, &Waiter, bool) -> bool, data: usize)194 pub fn set_cancel(&self, c: fn(usize, &Waiter, bool) -> bool, data: usize) { 195 debug_assert!( 196 !self.is_linked(), 197 "Cannot change cancellation function while linked" 198 ); 199 let mut cancel = self.cancel.lock(); 200 cancel.c = c; 201 cancel.data = data; 202 } 203 204 // Reset the Waiter back to its initial state. Panics if this `Waiter` is still linked into a 205 // waiter list. reset(&self, waiting_for: WaitingFor)206 pub fn reset(&self, waiting_for: WaitingFor) { 207 debug_assert!(!self.is_linked(), "Cannot reset `Waiter` while linked"); 208 self.set_waiting_for(waiting_for); 209 210 let mut state = self.state.lock(); 211 if let State::Waiting(waker) = mem::replace(&mut *state, State::Init) { 212 mem::drop(state); 213 mem::drop(waker); 214 } 215 } 216 217 // Wait until woken up by another thread. wait(&self) -> WaitFuture<'_>218 pub fn wait(&self) -> WaitFuture<'_> { 219 WaitFuture { waiter: self } 220 } 221 222 // Wake up the thread associated with this `Waiter`. Panics if `waiting_for()` does not return 223 // `WaitingFor::None` or if `is_linked()` returns true. wake(&self)224 pub fn wake(&self) { 225 debug_assert!(!self.is_linked(), "Cannot wake `Waiter` while linked"); 226 debug_assert_eq!(self.is_waiting_for(), WaitingFor::None); 227 228 let mut state = self.state.lock(); 229 230 if let State::Waiting(waker) = mem::replace(&mut *state, State::Woken) { 231 mem::drop(state); 232 waker.wake(); 233 } 234 } 235 } 236 237 pub struct WaitFuture<'w> { 238 waiter: &'w Waiter, 239 } 240 241 impl<'w> Future for WaitFuture<'w> { 242 type Output = (); 243 poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>244 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { 245 let mut state = self.waiter.state.lock(); 246 247 match mem::replace(&mut *state, State::Processing) { 248 State::Init => { 249 *state = State::Waiting(cx.waker().clone()); 250 251 Poll::Pending 252 } 253 State::Waiting(old_waker) => { 254 *state = State::Waiting(cx.waker().clone()); 255 mem::drop(state); 256 mem::drop(old_waker); 257 258 Poll::Pending 259 } 260 State::Woken => { 261 *state = State::Finished; 262 Poll::Ready(()) 263 } 264 State::Finished => { 265 panic!("Future polled after returning Poll::Ready"); 266 } 267 State::Processing => { 268 panic!("Unexpected waker state"); 269 } 270 } 271 } 272 } 273 274 impl<'w> Drop for WaitFuture<'w> { drop(&mut self)275 fn drop(&mut self) { 276 let state = self.waiter.state.lock(); 277 278 match *state { 279 State::Finished => {} 280 State::Processing => panic!("Unexpected waker state"), 281 State::Woken => { 282 mem::drop(state); 283 284 // We were woken but not polled. Wake up the next waiter. 285 let mut success = false; 286 while !success { 287 let cancel = self.waiter.cancel.lock(); 288 let c = cancel.c; 289 let data = cancel.data; 290 291 mem::drop(cancel); 292 293 success = c(data, self.waiter, true); 294 } 295 } 296 _ => { 297 mem::drop(state); 298 299 // Not woken. No need to wake up any waiters. 300 let mut success = false; 301 while !success { 302 let cancel = self.waiter.cancel.lock(); 303 let c = cancel.c; 304 let data = cancel.data; 305 306 mem::drop(cancel); 307 308 success = c(data, self.waiter, false); 309 } 310 } 311 } 312 } 313 } 314 315 intrusive_adapter!(pub WaiterAdapter = Arc<Waiter>: Waiter { link: AtomicLink }); 316 317 pub type WaiterList = LinkedList<WaiterAdapter>; 318