1 use crate::loom::sync::atomic::AtomicUsize; 2 3 use std::fmt; 4 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; 5 use std::usize; 6 7 pub(super) struct State { 8 val: AtomicUsize, 9 } 10 11 /// Current state value. 12 #[derive(Copy, Clone)] 13 pub(super) struct Snapshot(usize); 14 15 type UpdateResult = Result<Snapshot, Snapshot>; 16 17 /// The task is currently being run. 18 const RUNNING: usize = 0b0001; 19 20 /// The task is complete. 21 /// 22 /// Once this bit is set, it is never unset. 23 const COMPLETE: usize = 0b0010; 24 25 /// Extracts the task's lifecycle value from the state. 26 const LIFECYCLE_MASK: usize = 0b11; 27 28 /// Flag tracking if the task has been pushed into a run queue. 29 const NOTIFIED: usize = 0b100; 30 31 /// The join handle is still around. 32 #[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 33 const JOIN_INTEREST: usize = 0b1_000; 34 35 /// A join handle waker has been set. 36 #[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 37 const JOIN_WAKER: usize = 0b10_000; 38 39 /// The task has been forcibly cancelled. 40 #[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556 41 const CANCELLED: usize = 0b100_000; 42 43 /// All bits. 44 const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED; 45 46 /// Bits used by the ref count portion of the state. 47 const REF_COUNT_MASK: usize = !STATE_MASK; 48 49 /// Number of positions to shift the ref count. 50 const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize; 51 52 /// One ref count. 53 const REF_ONE: usize = 1 << REF_COUNT_SHIFT; 54 55 /// State a task is initialized with. 56 /// 57 /// A task is initialized with three references: 58 /// 59 /// * A reference that will be stored in an OwnedTasks or LocalOwnedTasks. 60 /// * A reference that will be sent to the scheduler as an ordinary notification. 61 /// * A reference for the JoinHandle. 62 /// 63 /// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set. 64 /// As the task starts with a `Notified`, `NOTIFIED` is set. 65 const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED; 66 67 #[must_use] 68 pub(super) enum TransitionToRunning { 69 Success, 70 Cancelled, 71 Failed, 72 Dealloc, 73 } 74 75 #[must_use] 76 pub(super) enum TransitionToIdle { 77 Ok, 78 OkNotified, 79 OkDealloc, 80 Cancelled, 81 } 82 83 #[must_use] 84 pub(super) enum TransitionToNotifiedByVal { 85 DoNothing, 86 Submit, 87 Dealloc, 88 } 89 90 #[must_use] 91 pub(crate) enum TransitionToNotifiedByRef { 92 DoNothing, 93 Submit, 94 } 95 96 /// All transitions are performed via RMW operations. This establishes an 97 /// unambiguous modification order. 98 impl State { 99 /// Returns a task's initial state. new() -> State100 pub(super) fn new() -> State { 101 // The raw task returned by this method has a ref-count of three. See 102 // the comment on INITIAL_STATE for more. 103 State { 104 val: AtomicUsize::new(INITIAL_STATE), 105 } 106 } 107 108 /// Loads the current state, establishes `Acquire` ordering. load(&self) -> Snapshot109 pub(super) fn load(&self) -> Snapshot { 110 Snapshot(self.val.load(Acquire)) 111 } 112 113 /// Attempts to transition the lifecycle to `Running`. This sets the 114 /// notified bit to false so notifications during the poll can be detected. transition_to_running(&self) -> TransitionToRunning115 pub(super) fn transition_to_running(&self) -> TransitionToRunning { 116 self.fetch_update_action(|mut next| { 117 let action; 118 assert!(next.is_notified()); 119 120 if !next.is_idle() { 121 // This happens if the task is either currently running or if it 122 // has already completed, e.g. if it was cancelled during 123 // shutdown. Consume the ref-count and return. 124 next.ref_dec(); 125 if next.ref_count() == 0 { 126 action = TransitionToRunning::Dealloc; 127 } else { 128 action = TransitionToRunning::Failed; 129 } 130 } else { 131 // We are able to lock the RUNNING bit. 132 next.set_running(); 133 next.unset_notified(); 134 135 if next.is_cancelled() { 136 action = TransitionToRunning::Cancelled; 137 } else { 138 action = TransitionToRunning::Success; 139 } 140 } 141 (action, Some(next)) 142 }) 143 } 144 145 /// Transitions the task from `Running` -> `Idle`. 146 /// 147 /// Returns `true` if the transition to `Idle` is successful, `false` otherwise. 148 /// The transition to `Idle` fails if the task has been flagged to be 149 /// cancelled. transition_to_idle(&self) -> TransitionToIdle150 pub(super) fn transition_to_idle(&self) -> TransitionToIdle { 151 self.fetch_update_action(|curr| { 152 assert!(curr.is_running()); 153 154 if curr.is_cancelled() { 155 return (TransitionToIdle::Cancelled, None); 156 } 157 158 let mut next = curr; 159 let action; 160 next.unset_running(); 161 162 if !next.is_notified() { 163 // Polling the future consumes the ref-count of the Notified. 164 next.ref_dec(); 165 if next.ref_count() == 0 { 166 action = TransitionToIdle::OkDealloc; 167 } else { 168 action = TransitionToIdle::Ok; 169 } 170 } else { 171 // The caller will schedule a new notification, so we create a 172 // new ref-count for the notification. Our own ref-count is kept 173 // for now, and the caller will drop it shortly. 174 next.ref_inc(); 175 action = TransitionToIdle::OkNotified; 176 } 177 178 (action, Some(next)) 179 }) 180 } 181 182 /// Transitions the task from `Running` -> `Complete`. transition_to_complete(&self) -> Snapshot183 pub(super) fn transition_to_complete(&self) -> Snapshot { 184 const DELTA: usize = RUNNING | COMPLETE; 185 186 let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel)); 187 assert!(prev.is_running()); 188 assert!(!prev.is_complete()); 189 190 Snapshot(prev.0 ^ DELTA) 191 } 192 193 /// Transitions from `Complete` -> `Terminal`, decrementing the reference 194 /// count the specified number of times. 195 /// 196 /// Returns true if the task should be deallocated. transition_to_terminal(&self, count: usize) -> bool197 pub(super) fn transition_to_terminal(&self, count: usize) -> bool { 198 let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel)); 199 assert!( 200 prev.ref_count() >= count, 201 "current: {}, sub: {}", 202 prev.ref_count(), 203 count 204 ); 205 prev.ref_count() == count 206 } 207 208 /// Transitions the state to `NOTIFIED`. 209 /// 210 /// If no task needs to be submitted, a ref-count is consumed. 211 /// 212 /// If a task needs to be submitted, the ref-count is incremented for the 213 /// new Notified. transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal214 pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal { 215 self.fetch_update_action(|mut snapshot| { 216 let action; 217 218 if snapshot.is_running() { 219 // If the task is running, we mark it as notified, but we should 220 // not submit anything as the thread currently running the 221 // future is responsible for that. 222 snapshot.set_notified(); 223 snapshot.ref_dec(); 224 225 // The thread that set the running bit also holds a ref-count. 226 assert!(snapshot.ref_count() > 0); 227 228 action = TransitionToNotifiedByVal::DoNothing; 229 } else if snapshot.is_complete() || snapshot.is_notified() { 230 // We do not need to submit any notifications, but we have to 231 // decrement the ref-count. 232 snapshot.ref_dec(); 233 234 if snapshot.ref_count() == 0 { 235 action = TransitionToNotifiedByVal::Dealloc; 236 } else { 237 action = TransitionToNotifiedByVal::DoNothing; 238 } 239 } else { 240 // We create a new notified that we can submit. The caller 241 // retains ownership of the ref-count they passed in. 242 snapshot.set_notified(); 243 snapshot.ref_inc(); 244 action = TransitionToNotifiedByVal::Submit; 245 } 246 247 (action, Some(snapshot)) 248 }) 249 } 250 251 /// Transitions the state to `NOTIFIED`. transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef252 pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef { 253 self.fetch_update_action(|mut snapshot| { 254 if snapshot.is_complete() || snapshot.is_notified() { 255 // There is nothing to do in this case. 256 (TransitionToNotifiedByRef::DoNothing, None) 257 } else if snapshot.is_running() { 258 // If the task is running, we mark it as notified, but we should 259 // not submit as the thread currently running the future is 260 // responsible for that. 261 snapshot.set_notified(); 262 (TransitionToNotifiedByRef::DoNothing, Some(snapshot)) 263 } else { 264 // The task is idle and not notified. We should submit a 265 // notification. 266 snapshot.set_notified(); 267 snapshot.ref_inc(); 268 (TransitionToNotifiedByRef::Submit, Some(snapshot)) 269 } 270 }) 271 } 272 273 /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref count. 274 #[cfg(all( 275 tokio_unstable, 276 tokio_taskdump, 277 feature = "rt", 278 target_os = "linux", 279 any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") 280 ))] transition_to_notified_for_tracing(&self)281 pub(super) fn transition_to_notified_for_tracing(&self) { 282 self.fetch_update_action(|mut snapshot| { 283 snapshot.set_notified(); 284 snapshot.ref_inc(); 285 ((), Some(snapshot)) 286 }); 287 } 288 289 /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle. 290 /// 291 /// Returns `true` if the task needs to be submitted to the pool for 292 /// execution. transition_to_notified_and_cancel(&self) -> bool293 pub(super) fn transition_to_notified_and_cancel(&self) -> bool { 294 self.fetch_update_action(|mut snapshot| { 295 if snapshot.is_cancelled() || snapshot.is_complete() { 296 // Aborts to completed or cancelled tasks are no-ops. 297 (false, None) 298 } else if snapshot.is_running() { 299 // If the task is running, we mark it as cancelled. The thread 300 // running the task will notice the cancelled bit when it 301 // stops polling and it will kill the task. 302 // 303 // The set_notified() call is not strictly necessary but it will 304 // in some cases let a wake_by_ref call return without having 305 // to perform a compare_exchange. 306 snapshot.set_notified(); 307 snapshot.set_cancelled(); 308 (false, Some(snapshot)) 309 } else { 310 // The task is idle. We set the cancelled and notified bits and 311 // submit a notification if the notified bit was not already 312 // set. 313 snapshot.set_cancelled(); 314 if !snapshot.is_notified() { 315 snapshot.set_notified(); 316 snapshot.ref_inc(); 317 (true, Some(snapshot)) 318 } else { 319 (false, Some(snapshot)) 320 } 321 } 322 }) 323 } 324 325 /// Sets the `CANCELLED` bit and attempts to transition to `Running`. 326 /// 327 /// Returns `true` if the transition to `Running` succeeded. transition_to_shutdown(&self) -> bool328 pub(super) fn transition_to_shutdown(&self) -> bool { 329 let mut prev = Snapshot(0); 330 331 let _ = self.fetch_update(|mut snapshot| { 332 prev = snapshot; 333 334 if snapshot.is_idle() { 335 snapshot.set_running(); 336 } 337 338 // If the task was not idle, the thread currently running the task 339 // will notice the cancelled bit and cancel it once the poll 340 // completes. 341 snapshot.set_cancelled(); 342 Some(snapshot) 343 }); 344 345 prev.is_idle() 346 } 347 348 /// Optimistically tries to swap the state assuming the join handle is 349 /// __immediately__ dropped on spawn. drop_join_handle_fast(&self) -> Result<(), ()>350 pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> { 351 use std::sync::atomic::Ordering::Relaxed; 352 353 // Relaxed is acceptable as if this function is called and succeeds, 354 // then nothing has been done w/ the join handle. 355 // 356 // The moment the join handle is used (polled), the `JOIN_WAKER` flag is 357 // set, at which point the CAS will fail. 358 // 359 // Given this, there is no risk if this operation is reordered. 360 self.val 361 .compare_exchange_weak( 362 INITIAL_STATE, 363 (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST, 364 Release, 365 Relaxed, 366 ) 367 .map(|_| ()) 368 .map_err(|_| ()) 369 } 370 371 /// Tries to unset the JOIN_INTEREST flag. 372 /// 373 /// Returns `Ok` if the operation happens before the task transitions to a 374 /// completed state, `Err` otherwise. unset_join_interested(&self) -> UpdateResult375 pub(super) fn unset_join_interested(&self) -> UpdateResult { 376 self.fetch_update(|curr| { 377 assert!(curr.is_join_interested()); 378 379 if curr.is_complete() { 380 return None; 381 } 382 383 let mut next = curr; 384 next.unset_join_interested(); 385 386 Some(next) 387 }) 388 } 389 390 /// Sets the `JOIN_WAKER` bit. 391 /// 392 /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if 393 /// the task has completed. set_join_waker(&self) -> UpdateResult394 pub(super) fn set_join_waker(&self) -> UpdateResult { 395 self.fetch_update(|curr| { 396 assert!(curr.is_join_interested()); 397 assert!(!curr.is_join_waker_set()); 398 399 if curr.is_complete() { 400 return None; 401 } 402 403 let mut next = curr; 404 next.set_join_waker(); 405 406 Some(next) 407 }) 408 } 409 410 /// Unsets the `JOIN_WAKER` bit. 411 /// 412 /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if 413 /// the task has completed. unset_waker(&self) -> UpdateResult414 pub(super) fn unset_waker(&self) -> UpdateResult { 415 self.fetch_update(|curr| { 416 assert!(curr.is_join_interested()); 417 assert!(curr.is_join_waker_set()); 418 419 if curr.is_complete() { 420 return None; 421 } 422 423 let mut next = curr; 424 next.unset_join_waker(); 425 426 Some(next) 427 }) 428 } 429 ref_inc(&self)430 pub(super) fn ref_inc(&self) { 431 use std::process; 432 use std::sync::atomic::Ordering::Relaxed; 433 434 // Using a relaxed ordering is alright here, as knowledge of the 435 // original reference prevents other threads from erroneously deleting 436 // the object. 437 // 438 // As explained in the [Boost documentation][1], Increasing the 439 // reference counter can always be done with memory_order_relaxed: New 440 // references to an object can only be formed from an existing 441 // reference, and passing an existing reference from one thread to 442 // another must already provide any required synchronization. 443 // 444 // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html) 445 let prev = self.val.fetch_add(REF_ONE, Relaxed); 446 447 // If the reference count overflowed, abort. 448 if prev > isize::MAX as usize { 449 process::abort(); 450 } 451 } 452 453 /// Returns `true` if the task should be released. ref_dec(&self) -> bool454 pub(super) fn ref_dec(&self) -> bool { 455 let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel)); 456 assert!(prev.ref_count() >= 1); 457 prev.ref_count() == 1 458 } 459 460 /// Returns `true` if the task should be released. ref_dec_twice(&self) -> bool461 pub(super) fn ref_dec_twice(&self) -> bool { 462 let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel)); 463 assert!(prev.ref_count() >= 2); 464 prev.ref_count() == 2 465 } 466 fetch_update_action<F, T>(&self, mut f: F) -> T where F: FnMut(Snapshot) -> (T, Option<Snapshot>),467 fn fetch_update_action<F, T>(&self, mut f: F) -> T 468 where 469 F: FnMut(Snapshot) -> (T, Option<Snapshot>), 470 { 471 let mut curr = self.load(); 472 473 loop { 474 let (output, next) = f(curr); 475 let next = match next { 476 Some(next) => next, 477 None => return output, 478 }; 479 480 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); 481 482 match res { 483 Ok(_) => return output, 484 Err(actual) => curr = Snapshot(actual), 485 } 486 } 487 } 488 fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> where F: FnMut(Snapshot) -> Option<Snapshot>,489 fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> 490 where 491 F: FnMut(Snapshot) -> Option<Snapshot>, 492 { 493 let mut curr = self.load(); 494 495 loop { 496 let next = match f(curr) { 497 Some(next) => next, 498 None => return Err(curr), 499 }; 500 501 let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire); 502 503 match res { 504 Ok(_) => return Ok(next), 505 Err(actual) => curr = Snapshot(actual), 506 } 507 } 508 } 509 } 510 511 // ===== impl Snapshot ===== 512 513 impl Snapshot { 514 /// Returns `true` if the task is in an idle state. is_idle(self) -> bool515 pub(super) fn is_idle(self) -> bool { 516 self.0 & (RUNNING | COMPLETE) == 0 517 } 518 519 /// Returns `true` if the task has been flagged as notified. is_notified(self) -> bool520 pub(super) fn is_notified(self) -> bool { 521 self.0 & NOTIFIED == NOTIFIED 522 } 523 unset_notified(&mut self)524 fn unset_notified(&mut self) { 525 self.0 &= !NOTIFIED 526 } 527 set_notified(&mut self)528 fn set_notified(&mut self) { 529 self.0 |= NOTIFIED 530 } 531 is_running(self) -> bool532 pub(super) fn is_running(self) -> bool { 533 self.0 & RUNNING == RUNNING 534 } 535 set_running(&mut self)536 fn set_running(&mut self) { 537 self.0 |= RUNNING; 538 } 539 unset_running(&mut self)540 fn unset_running(&mut self) { 541 self.0 &= !RUNNING; 542 } 543 is_cancelled(self) -> bool544 pub(super) fn is_cancelled(self) -> bool { 545 self.0 & CANCELLED == CANCELLED 546 } 547 set_cancelled(&mut self)548 fn set_cancelled(&mut self) { 549 self.0 |= CANCELLED; 550 } 551 552 /// Returns `true` if the task's future has completed execution. is_complete(self) -> bool553 pub(super) fn is_complete(self) -> bool { 554 self.0 & COMPLETE == COMPLETE 555 } 556 is_join_interested(self) -> bool557 pub(super) fn is_join_interested(self) -> bool { 558 self.0 & JOIN_INTEREST == JOIN_INTEREST 559 } 560 unset_join_interested(&mut self)561 fn unset_join_interested(&mut self) { 562 self.0 &= !JOIN_INTEREST 563 } 564 is_join_waker_set(self) -> bool565 pub(super) fn is_join_waker_set(self) -> bool { 566 self.0 & JOIN_WAKER == JOIN_WAKER 567 } 568 set_join_waker(&mut self)569 fn set_join_waker(&mut self) { 570 self.0 |= JOIN_WAKER; 571 } 572 unset_join_waker(&mut self)573 fn unset_join_waker(&mut self) { 574 self.0 &= !JOIN_WAKER 575 } 576 ref_count(self) -> usize577 pub(super) fn ref_count(self) -> usize { 578 (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT 579 } 580 ref_inc(&mut self)581 fn ref_inc(&mut self) { 582 assert!(self.0 <= isize::MAX as usize); 583 self.0 += REF_ONE; 584 } 585 ref_dec(&mut self)586 pub(super) fn ref_dec(&mut self) { 587 assert!(self.ref_count() > 0); 588 self.0 -= REF_ONE 589 } 590 } 591 592 impl fmt::Debug for State { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result593 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 594 let snapshot = self.load(); 595 snapshot.fmt(fmt) 596 } 597 } 598 599 impl fmt::Debug for Snapshot { fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result600 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { 601 fmt.debug_struct("Snapshot") 602 .field("is_running", &self.is_running()) 603 .field("is_complete", &self.is_complete()) 604 .field("is_notified", &self.is_notified()) 605 .field("is_cancelled", &self.is_cancelled()) 606 .field("is_join_interested", &self.is_join_interested()) 607 .field("is_join_waker_set", &self.is_join_waker_set()) 608 .field("ref_count", &self.ref_count()) 609 .finish() 610 } 611 } 612