1 use crate::loom::sync::atomic::AtomicUsize; 2 3 use std::fmt; 4 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; 5 use std::usize; 6 7 pub(super) struct State { 8 val: AtomicUsize, 9 } 10 11 /// Current state value 12 #[derive(Copy, Clone)] 13 pub(super) struct Snapshot(usize); 14 15 type UpdateResult = Result<Snapshot, Snapshot>; 16 17 /// The task is currently being run. 18 const RUNNING: usize = 0b0001; 19 20 /// The task is complete. 21 /// 22 /// Once this bit is set, it is never unset 23 const COMPLETE: usize = 0b0010; 24 25 /// Extracts the task's lifecycle value from the state 26 const LIFECYCLE_MASK: usize = 0b11; 27 28 /// Flag tracking if the task has been pushed into a run queue. 29 const NOTIFIED: usize = 0b100; 30 31 /// The join handle is still around 32 const JOIN_INTEREST: usize = 0b1_000; 33 34 /// A join handle waker has been set 35 const JOIN_WAKER: usize = 0b10_000; 36 37 /// The task has been forcibly cancelled. 38 const CANCELLED: usize = 0b100_000; 39 40 /// All bits 41 const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED; 42 43 /// Bits used by the ref count portion of the state. 44 const REF_COUNT_MASK: usize = !STATE_MASK; 45 46 /// Number of positions to shift the ref count 47 const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize; 48 49 /// One ref count 50 const REF_ONE: usize = 1 << REF_COUNT_SHIFT; 51 52 /// State a task is initialized with 53 /// 54 /// A task is initialized with two references: one for the scheduler and one for 55 /// the `JoinHandle`. As the task starts with a `JoinHandle`, `JOIN_INTERST` is 56 /// set. A new task is immediately pushed into the run queue for execution and 57 /// starts with the `NOTIFIED` flag set. 58 const INITIAL_STATE: usize = (REF_ONE * 2) | JOIN_INTEREST | NOTIFIED; 59 60 /// All transitions are performed via RMW operations. This establishes an 61 /// unambiguous modification order. 62 impl State { 63 /// Return a task's initial state new() -> State64 pub(super) fn new() -> State { 65 // A task is initialized with three references: one for the scheduler, 66 // one for the `JoinHandle`, one for the task handle made available in 67 // release. As the task starts with a `JoinHandle`, `JOIN_INTERST` is 68 // set. A new task is immediately pushed into the run queue for 69 // execution and starts with the `NOTIFIED` flag set. 70 State { 71 val: AtomicUsize::new(INITIAL_STATE), 72 } 73 } 74 75 /// Loads the current state, establishes `Acquire` ordering. load(&self) -> Snapshot76 pub(super) fn load(&self) -> Snapshot { 77 Snapshot(self.val.load(Acquire)) 78 } 79 80 /// Attempt to transition the lifecycle to `Running`. 81 /// 82 /// If `ref_inc` is set, the reference count is also incremented. 83 /// 84 /// The `NOTIFIED` bit is always unset. transition_to_running(&self, ref_inc: bool) -> UpdateResult85 pub(super) fn transition_to_running(&self, ref_inc: bool) -> UpdateResult { 86 self.fetch_update(|curr| { 87 assert!(curr.is_notified()); 88 89 let mut next = curr; 90 91 if !next.is_idle() { 92 return None; 93 } 94 95 if ref_inc { 96 next.ref_inc(); 97 } 98 99 next.set_running(); 100 next.unset_notified(); 101 Some(next) 102 }) 103 } 104 105 /// Transitions the task from `Running` -> `Idle`. 106 /// 107 /// Returns `Ok` if the transition to `Idle` is successful, `Err` otherwise. 108 /// In both cases, a snapshot of the state from **after** the transition is 109 /// returned. 110 /// 111 /// The transition to `Idle` fails if the task has been flagged to be 112 /// cancelled. transition_to_idle(&self) -> UpdateResult113 pub(super) fn transition_to_idle(&self) -> UpdateResult { 114 self.fetch_update(|curr| { 115 assert!(curr.is_running()); 116 117 if curr.is_cancelled() { 118 return None; 119 } 120 121 let mut next = curr; 122 next.unset_running(); 123 124 if next.is_notified() { 125 // The caller needs to schedule the task. To do this, it needs a 126 // waker. The waker requires a ref count. 127 next.ref_inc(); 128 } 129 130 Some(next) 131 }) 132 } 133 134 /// Transitions the task from `Running` -> `Complete`. transition_to_complete(&self) -> Snapshot135 pub(super) fn transition_to_complete(&self) -> Snapshot { 136 const DELTA: usize = RUNNING | COMPLETE; 137 138 let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel)); 139 assert!(prev.is_running()); 140 assert!(!prev.is_complete()); 141 142 Snapshot(prev.0 ^ DELTA) 143 } 144 145 /// Transition from `Complete` -> `Terminal`, decrementing the reference 146 /// count by 1. 147 /// 148 /// When `ref_dec` is set, an additional ref count decrement is performed. 149 /// This is used to batch atomic ops when possible. transition_to_terminal(&self, complete: bool, ref_dec: bool) -> Snapshot150 pub(super) fn transition_to_terminal(&self, complete: bool, ref_dec: bool) -> Snapshot { 151 self.fetch_update(|mut snapshot| { 152 if complete { 153 snapshot.set_complete(); 154 } else { 155 assert!(snapshot.is_complete()); 156 } 157 158 // Decrement the primary handle 159 snapshot.ref_dec(); 160 161 if ref_dec { 162 // Decrement a second time 163 snapshot.ref_dec(); 164 } 165 166 Some(snapshot) 167 }) 168 .unwrap() 169 } 170 171 /// Transitions the state to `NOTIFIED`. 172 /// 173 /// Returns `true` if the task needs to be submitted to the pool for 174 /// execution transition_to_notified(&self) -> bool175 pub(super) fn transition_to_notified(&self) -> bool { 176 let prev = Snapshot(self.val.fetch_or(NOTIFIED, AcqRel)); 177 prev.will_need_queueing() 178 } 179 180 /// Set the `CANCELLED` bit and attempt to transition to `Running`. 181 /// 182 /// Returns `true` if the transition to `Running` succeeded. transition_to_shutdown(&self) -> bool183 pub(super) fn transition_to_shutdown(&self) -> bool { 184 let mut prev = Snapshot(0); 185 186 let _ = self.fetch_update(|mut snapshot| { 187 prev = snapshot; 188 189 if snapshot.is_idle() { 190 snapshot.set_running(); 191 192 if snapshot.is_notified() { 193 // If the task is idle and notified, this indicates the task is 194 // in the run queue and is considered owned by the scheduler. 195 // The shutdown operation claims ownership of the task, which 196 // means we need to assign an additional ref-count to the task 197 // in the queue. 198 snapshot.ref_inc(); 199 } 200 } 201 202 snapshot.set_cancelled(); 203 Some(snapshot) 204 }); 205 206 prev.is_idle() 207 } 208 209 /// Optimistically tries to swap the state assuming the join handle is 210 /// __immediately__ dropped on spawn drop_join_handle_fast(&self) -> Result<(), ()>211 pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> { 212 use std::sync::atomic::Ordering::Relaxed; 213 214 // Relaxed is acceptable as if this function is called and succeeds, 215 // then nothing has been done w/ the join handle. 216 // 217 // The moment the join handle is used (polled), the `JOIN_WAKER` flag is 218 // set, at which point the CAS will fail. 219 // 220 // Given this, there is no risk if this operation is reordered. 221 self.val 222 .compare_exchange_weak( 223 INITIAL_STATE, 224 (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST, 225 Release, 226 Relaxed, 227 ) 228 .map(|_| ()) 229 .map_err(|_| ()) 230 } 231 232 /// Try to unset the JOIN_INTEREST flag. 233 /// 234 /// Returns `Ok` if the operation happens before the task transitions to a 235 /// completed state, `Err` otherwise. unset_join_interested(&self) -> UpdateResult236 pub(super) fn unset_join_interested(&self) -> UpdateResult { 237 self.fetch_update(|curr| { 238 assert!(curr.is_join_interested()); 239 240 if curr.is_complete() { 241 return None; 242 } 243 244 let mut next = curr; 245 next.unset_join_interested(); 246 247 Some(next) 248 }) 249 } 250 251 /// Set the `JOIN_WAKER` bit. 252 /// 253 /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if 254 /// the task has completed. set_join_waker(&self) -> UpdateResult255 pub(super) fn set_join_waker(&self) -> UpdateResult { 256 self.fetch_update(|curr| { 257 assert!(curr.is_join_interested()); 258 assert!(!curr.has_join_waker()); 259 260 if curr.is_complete() { 261 return None; 262 } 263 264 let mut next = curr; 265 next.set_join_waker(); 266 267 Some(next) 268 }) 269 } 270 271 /// Unsets the `JOIN_WAKER` bit. 272 /// 273 /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if 274 /// the task has completed. unset_waker(&self) -> UpdateResult275 pub(super) fn unset_waker(&self) -> UpdateResult { 276 self.fetch_update(|curr| { 277 assert!(curr.is_join_interested()); 278 assert!(curr.has_join_waker()); 279 280 if curr.is_complete() { 281 return None; 282 } 283 284 let mut next = curr; 285 next.unset_join_waker(); 286 287 Some(next) 288 }) 289 } 290 ref_inc(&self)291 pub(super) fn ref_inc(&self) { 292 use std::process; 293 use std::sync::atomic::Ordering::Relaxed; 294 295 // Using a relaxed ordering is alright here, as knowledge of the 296 // original reference prevents other threads from erroneously deleting 297 // the object. 298 // 299 // As explained in the [Boost documentation][1], Increasing the 300 // reference counter can always be done with memory_order_relaxed: New 301 // references to an object can only be formed from an existing 302 // reference, and passing an existing reference from one thread to 303 // another must already provide any required synchronization. 304 // 305 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) 306 let prev = self.val.fetch_add(REF_ONE, Relaxed); 307 308 // If the reference count overflowed, abort. 309 if prev > isize::max_value() as usize { 310 process::abort(); 311 } 312 } 313 314 /// Returns `true` if the task should be released. ref_dec(&self) -> bool315 pub(super) fn ref_dec(&self) -> bool { 316 let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel)); 317 prev.ref_count() == 1 318 } 319 fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> where F: FnMut(Snapshot) -> Option<Snapshot>,320 fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> 321 where 322 F: FnMut(Snapshot) -> Option<Snapshot>, 323 { 324 let mut curr = self.load(); 325 326 loop { 327 let next = match f(curr) { 328 Some(next) => next, 329 None => return Err(curr), 330 }; 331 332 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); 333 334 match res { 335 Ok(_) => return Ok(next), 336 Err(actual) => curr = Snapshot(actual), 337 } 338 } 339 } 340 } 341 342 // ===== impl Snapshot ===== 343 344 impl Snapshot { 345 /// Returns `true` if the task is in an idle state. is_idle(self) -> bool346 pub(super) fn is_idle(self) -> bool { 347 self.0 & (RUNNING | COMPLETE) == 0 348 } 349 350 /// Returns `true` if the task has been flagged as notified. is_notified(self) -> bool351 pub(super) fn is_notified(self) -> bool { 352 self.0 & NOTIFIED == NOTIFIED 353 } 354 unset_notified(&mut self)355 fn unset_notified(&mut self) { 356 self.0 &= !NOTIFIED 357 } 358 is_running(self) -> bool359 pub(super) fn is_running(self) -> bool { 360 self.0 & RUNNING == RUNNING 361 } 362 set_running(&mut self)363 fn set_running(&mut self) { 364 self.0 |= RUNNING; 365 } 366 unset_running(&mut self)367 fn unset_running(&mut self) { 368 self.0 &= !RUNNING; 369 } 370 is_cancelled(self) -> bool371 pub(super) fn is_cancelled(self) -> bool { 372 self.0 & CANCELLED == CANCELLED 373 } 374 set_cancelled(&mut self)375 fn set_cancelled(&mut self) { 376 self.0 |= CANCELLED; 377 } 378 set_complete(&mut self)379 fn set_complete(&mut self) { 380 self.0 |= COMPLETE; 381 } 382 383 /// Returns `true` if the task's future has completed execution. is_complete(self) -> bool384 pub(super) fn is_complete(self) -> bool { 385 self.0 & COMPLETE == COMPLETE 386 } 387 is_join_interested(self) -> bool388 pub(super) fn is_join_interested(self) -> bool { 389 self.0 & JOIN_INTEREST == JOIN_INTEREST 390 } 391 unset_join_interested(&mut self)392 fn unset_join_interested(&mut self) { 393 self.0 &= !JOIN_INTEREST 394 } 395 has_join_waker(self) -> bool396 pub(super) fn has_join_waker(self) -> bool { 397 self.0 & JOIN_WAKER == JOIN_WAKER 398 } 399 set_join_waker(&mut self)400 fn set_join_waker(&mut self) { 401 self.0 |= JOIN_WAKER; 402 } 403 unset_join_waker(&mut self)404 fn unset_join_waker(&mut self) { 405 self.0 &= !JOIN_WAKER 406 } 407 ref_count(self) -> usize408 pub(super) fn ref_count(self) -> usize { 409 (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT 410 } 411 ref_inc(&mut self)412 fn ref_inc(&mut self) { 413 assert!(self.0 <= isize::max_value() as usize); 414 self.0 += REF_ONE; 415 } 416 ref_dec(&mut self)417 pub(super) fn ref_dec(&mut self) { 418 assert!(self.ref_count() > 0); 419 self.0 -= REF_ONE 420 } 421 will_need_queueing(self) -> bool422 fn will_need_queueing(self) -> bool { 423 !self.is_notified() && self.is_idle() 424 } 425 } 426 427 impl fmt::Debug for State { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result428 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 429 let snapshot = self.load(); 430 snapshot.fmt(fmt) 431 } 432 } 433 434 impl fmt::Debug for Snapshot { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result435 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 436 fmt.debug_struct("Snapshot") 437 .field("is_running", &self.is_running()) 438 .field("is_complete", &self.is_complete()) 439 .field("is_notified", &self.is_notified()) 440 .field("is_cancelled", &self.is_cancelled()) 441 .field("is_join_interested", &self.is_join_interested()) 442 .field("has_join_waker", &self.has_join_waker()) 443 .field("ref_count", &self.ref_count()) 444 .finish() 445 } 446 } 447