• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::loom::sync::atomic::AtomicUsize;
2 
3 use std::fmt;
4 use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
5 use std::usize;
6 
7 pub(super) struct State {
8     val: AtomicUsize,
9 }
10 
11 /// Current state value.
12 #[derive(Copy, Clone)]
13 pub(super) struct Snapshot(usize);
14 
15 type UpdateResult = Result<Snapshot, Snapshot>;
16 
17 /// The task is currently being run.
18 const RUNNING: usize = 0b0001;
19 
20 /// The task is complete.
21 ///
22 /// Once this bit is set, it is never unset.
23 const COMPLETE: usize = 0b0010;
24 
25 /// Extracts the task's lifecycle value from the state.
26 const LIFECYCLE_MASK: usize = 0b11;
27 
28 /// Flag tracking if the task has been pushed into a run queue.
29 const NOTIFIED: usize = 0b100;
30 
31 /// The join handle is still around.
32 #[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
33 const JOIN_INTEREST: usize = 0b1_000;
34 
35 /// A join handle waker has been set.
36 #[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
37 const JOIN_WAKER: usize = 0b10_000;
38 
39 /// The task has been forcibly cancelled.
40 #[allow(clippy::unusual_byte_groupings)] // https://github.com/rust-lang/rust-clippy/issues/6556
41 const CANCELLED: usize = 0b100_000;
42 
43 /// All bits.
44 const STATE_MASK: usize = LIFECYCLE_MASK | NOTIFIED | JOIN_INTEREST | JOIN_WAKER | CANCELLED;
45 
46 /// Bits used by the ref count portion of the state.
47 const REF_COUNT_MASK: usize = !STATE_MASK;
48 
49 /// Number of positions to shift the ref count.
50 const REF_COUNT_SHIFT: usize = REF_COUNT_MASK.count_zeros() as usize;
51 
52 /// One ref count.
53 const REF_ONE: usize = 1 << REF_COUNT_SHIFT;
54 
55 /// State a task is initialized with.
56 ///
57 /// A task is initialized with three references:
58 ///
59 ///  * A reference that will be stored in an OwnedTasks or LocalOwnedTasks.
60 ///  * A reference that will be sent to the scheduler as an ordinary notification.
61 ///  * A reference for the JoinHandle.
62 ///
63 /// As the task starts with a `JoinHandle`, `JOIN_INTEREST` is set.
64 /// As the task starts with a `Notified`, `NOTIFIED` is set.
65 const INITIAL_STATE: usize = (REF_ONE * 3) | JOIN_INTEREST | NOTIFIED;
66 
67 #[must_use]
68 pub(super) enum TransitionToRunning {
69     Success,
70     Cancelled,
71     Failed,
72     Dealloc,
73 }
74 
75 #[must_use]
76 pub(super) enum TransitionToIdle {
77     Ok,
78     OkNotified,
79     OkDealloc,
80     Cancelled,
81 }
82 
83 #[must_use]
84 pub(super) enum TransitionToNotifiedByVal {
85     DoNothing,
86     Submit,
87     Dealloc,
88 }
89 
90 #[must_use]
91 pub(crate) enum TransitionToNotifiedByRef {
92     DoNothing,
93     Submit,
94 }
95 
96 /// All transitions are performed via RMW operations. This establishes an
97 /// unambiguous modification order.
98 impl State {
99     /// Returns a task's initial state.
new() -> State100     pub(super) fn new() -> State {
101         // The raw task returned by this method has a ref-count of three. See
102         // the comment on INITIAL_STATE for more.
103         State {
104             val: AtomicUsize::new(INITIAL_STATE),
105         }
106     }
107 
108     /// Loads the current state, establishes `Acquire` ordering.
load(&self) -> Snapshot109     pub(super) fn load(&self) -> Snapshot {
110         Snapshot(self.val.load(Acquire))
111     }
112 
113     /// Attempts to transition the lifecycle to `Running`. This sets the
114     /// notified bit to false so notifications during the poll can be detected.
transition_to_running(&self) -> TransitionToRunning115     pub(super) fn transition_to_running(&self) -> TransitionToRunning {
116         self.fetch_update_action(|mut next| {
117             let action;
118             assert!(next.is_notified());
119 
120             if !next.is_idle() {
121                 // This happens if the task is either currently running or if it
122                 // has already completed, e.g. if it was cancelled during
123                 // shutdown. Consume the ref-count and return.
124                 next.ref_dec();
125                 if next.ref_count() == 0 {
126                     action = TransitionToRunning::Dealloc;
127                 } else {
128                     action = TransitionToRunning::Failed;
129                 }
130             } else {
131                 // We are able to lock the RUNNING bit.
132                 next.set_running();
133                 next.unset_notified();
134 
135                 if next.is_cancelled() {
136                     action = TransitionToRunning::Cancelled;
137                 } else {
138                     action = TransitionToRunning::Success;
139                 }
140             }
141             (action, Some(next))
142         })
143     }
144 
145     /// Transitions the task from `Running` -> `Idle`.
146     ///
147     /// Returns `true` if the transition to `Idle` is successful, `false` otherwise.
148     /// The transition to `Idle` fails if the task has been flagged to be
149     /// cancelled.
transition_to_idle(&self) -> TransitionToIdle150     pub(super) fn transition_to_idle(&self) -> TransitionToIdle {
151         self.fetch_update_action(|curr| {
152             assert!(curr.is_running());
153 
154             if curr.is_cancelled() {
155                 return (TransitionToIdle::Cancelled, None);
156             }
157 
158             let mut next = curr;
159             let action;
160             next.unset_running();
161 
162             if !next.is_notified() {
163                 // Polling the future consumes the ref-count of the Notified.
164                 next.ref_dec();
165                 if next.ref_count() == 0 {
166                     action = TransitionToIdle::OkDealloc;
167                 } else {
168                     action = TransitionToIdle::Ok;
169                 }
170             } else {
171                 // The caller will schedule a new notification, so we create a
172                 // new ref-count for the notification. Our own ref-count is kept
173                 // for now, and the caller will drop it shortly.
174                 next.ref_inc();
175                 action = TransitionToIdle::OkNotified;
176             }
177 
178             (action, Some(next))
179         })
180     }
181 
182     /// Transitions the task from `Running` -> `Complete`.
transition_to_complete(&self) -> Snapshot183     pub(super) fn transition_to_complete(&self) -> Snapshot {
184         const DELTA: usize = RUNNING | COMPLETE;
185 
186         let prev = Snapshot(self.val.fetch_xor(DELTA, AcqRel));
187         assert!(prev.is_running());
188         assert!(!prev.is_complete());
189 
190         Snapshot(prev.0 ^ DELTA)
191     }
192 
193     /// Transitions from `Complete` -> `Terminal`, decrementing the reference
194     /// count the specified number of times.
195     ///
196     /// Returns true if the task should be deallocated.
transition_to_terminal(&self, count: usize) -> bool197     pub(super) fn transition_to_terminal(&self, count: usize) -> bool {
198         let prev = Snapshot(self.val.fetch_sub(count * REF_ONE, AcqRel));
199         assert!(
200             prev.ref_count() >= count,
201             "current: {}, sub: {}",
202             prev.ref_count(),
203             count
204         );
205         prev.ref_count() == count
206     }
207 
208     /// Transitions the state to `NOTIFIED`.
209     ///
210     /// If no task needs to be submitted, a ref-count is consumed.
211     ///
212     /// If a task needs to be submitted, the ref-count is incremented for the
213     /// new Notified.
transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal214     pub(super) fn transition_to_notified_by_val(&self) -> TransitionToNotifiedByVal {
215         self.fetch_update_action(|mut snapshot| {
216             let action;
217 
218             if snapshot.is_running() {
219                 // If the task is running, we mark it as notified, but we should
220                 // not submit anything as the thread currently running the
221                 // future is responsible for that.
222                 snapshot.set_notified();
223                 snapshot.ref_dec();
224 
225                 // The thread that set the running bit also holds a ref-count.
226                 assert!(snapshot.ref_count() > 0);
227 
228                 action = TransitionToNotifiedByVal::DoNothing;
229             } else if snapshot.is_complete() || snapshot.is_notified() {
230                 // We do not need to submit any notifications, but we have to
231                 // decrement the ref-count.
232                 snapshot.ref_dec();
233 
234                 if snapshot.ref_count() == 0 {
235                     action = TransitionToNotifiedByVal::Dealloc;
236                 } else {
237                     action = TransitionToNotifiedByVal::DoNothing;
238                 }
239             } else {
240                 // We create a new notified that we can submit. The caller
241                 // retains ownership of the ref-count they passed in.
242                 snapshot.set_notified();
243                 snapshot.ref_inc();
244                 action = TransitionToNotifiedByVal::Submit;
245             }
246 
247             (action, Some(snapshot))
248         })
249     }
250 
251     /// Transitions the state to `NOTIFIED`.
transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef252     pub(super) fn transition_to_notified_by_ref(&self) -> TransitionToNotifiedByRef {
253         self.fetch_update_action(|mut snapshot| {
254             if snapshot.is_complete() || snapshot.is_notified() {
255                 // There is nothing to do in this case.
256                 (TransitionToNotifiedByRef::DoNothing, None)
257             } else if snapshot.is_running() {
258                 // If the task is running, we mark it as notified, but we should
259                 // not submit as the thread currently running the future is
260                 // responsible for that.
261                 snapshot.set_notified();
262                 (TransitionToNotifiedByRef::DoNothing, Some(snapshot))
263             } else {
264                 // The task is idle and not notified. We should submit a
265                 // notification.
266                 snapshot.set_notified();
267                 snapshot.ref_inc();
268                 (TransitionToNotifiedByRef::Submit, Some(snapshot))
269             }
270         })
271     }
272 
273     /// Transitions the state to `NOTIFIED`, unconditionally increasing the ref count.
274     #[cfg(all(
275         tokio_unstable,
276         tokio_taskdump,
277         feature = "rt",
278         target_os = "linux",
279         any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
280     ))]
transition_to_notified_for_tracing(&self)281     pub(super) fn transition_to_notified_for_tracing(&self) {
282         self.fetch_update_action(|mut snapshot| {
283             snapshot.set_notified();
284             snapshot.ref_inc();
285             ((), Some(snapshot))
286         });
287     }
288 
289     /// Sets the cancelled bit and transitions the state to `NOTIFIED` if idle.
290     ///
291     /// Returns `true` if the task needs to be submitted to the pool for
292     /// execution.
transition_to_notified_and_cancel(&self) -> bool293     pub(super) fn transition_to_notified_and_cancel(&self) -> bool {
294         self.fetch_update_action(|mut snapshot| {
295             if snapshot.is_cancelled() || snapshot.is_complete() {
296                 // Aborts to completed or cancelled tasks are no-ops.
297                 (false, None)
298             } else if snapshot.is_running() {
299                 // If the task is running, we mark it as cancelled. The thread
300                 // running the task will notice the cancelled bit when it
301                 // stops polling and it will kill the task.
302                 //
303                 // The set_notified() call is not strictly necessary but it will
304                 // in some cases let a wake_by_ref call return without having
305                 // to perform a compare_exchange.
306                 snapshot.set_notified();
307                 snapshot.set_cancelled();
308                 (false, Some(snapshot))
309             } else {
310                 // The task is idle. We set the cancelled and notified bits and
311                 // submit a notification if the notified bit was not already
312                 // set.
313                 snapshot.set_cancelled();
314                 if !snapshot.is_notified() {
315                     snapshot.set_notified();
316                     snapshot.ref_inc();
317                     (true, Some(snapshot))
318                 } else {
319                     (false, Some(snapshot))
320                 }
321             }
322         })
323     }
324 
325     /// Sets the `CANCELLED` bit and attempts to transition to `Running`.
326     ///
327     /// Returns `true` if the transition to `Running` succeeded.
transition_to_shutdown(&self) -> bool328     pub(super) fn transition_to_shutdown(&self) -> bool {
329         let mut prev = Snapshot(0);
330 
331         let _ = self.fetch_update(|mut snapshot| {
332             prev = snapshot;
333 
334             if snapshot.is_idle() {
335                 snapshot.set_running();
336             }
337 
338             // If the task was not idle, the thread currently running the task
339             // will notice the cancelled bit and cancel it once the poll
340             // completes.
341             snapshot.set_cancelled();
342             Some(snapshot)
343         });
344 
345         prev.is_idle()
346     }
347 
348     /// Optimistically tries to swap the state assuming the join handle is
349     /// __immediately__ dropped on spawn.
drop_join_handle_fast(&self) -> Result<(), ()>350     pub(super) fn drop_join_handle_fast(&self) -> Result<(), ()> {
351         use std::sync::atomic::Ordering::Relaxed;
352 
353         // Relaxed is acceptable as if this function is called and succeeds,
354         // then nothing has been done w/ the join handle.
355         //
356         // The moment the join handle is used (polled), the `JOIN_WAKER` flag is
357         // set, at which point the CAS will fail.
358         //
359         // Given this, there is no risk if this operation is reordered.
360         self.val
361             .compare_exchange_weak(
362                 INITIAL_STATE,
363                 (INITIAL_STATE - REF_ONE) & !JOIN_INTEREST,
364                 Release,
365                 Relaxed,
366             )
367             .map(|_| ())
368             .map_err(|_| ())
369     }
370 
371     /// Tries to unset the JOIN_INTEREST flag.
372     ///
373     /// Returns `Ok` if the operation happens before the task transitions to a
374     /// completed state, `Err` otherwise.
unset_join_interested(&self) -> UpdateResult375     pub(super) fn unset_join_interested(&self) -> UpdateResult {
376         self.fetch_update(|curr| {
377             assert!(curr.is_join_interested());
378 
379             if curr.is_complete() {
380                 return None;
381             }
382 
383             let mut next = curr;
384             next.unset_join_interested();
385 
386             Some(next)
387         })
388     }
389 
390     /// Sets the `JOIN_WAKER` bit.
391     ///
392     /// Returns `Ok` if the bit is set, `Err` otherwise. This operation fails if
393     /// the task has completed.
set_join_waker(&self) -> UpdateResult394     pub(super) fn set_join_waker(&self) -> UpdateResult {
395         self.fetch_update(|curr| {
396             assert!(curr.is_join_interested());
397             assert!(!curr.is_join_waker_set());
398 
399             if curr.is_complete() {
400                 return None;
401             }
402 
403             let mut next = curr;
404             next.set_join_waker();
405 
406             Some(next)
407         })
408     }
409 
410     /// Unsets the `JOIN_WAKER` bit.
411     ///
412     /// Returns `Ok` has been unset, `Err` otherwise. This operation fails if
413     /// the task has completed.
unset_waker(&self) -> UpdateResult414     pub(super) fn unset_waker(&self) -> UpdateResult {
415         self.fetch_update(|curr| {
416             assert!(curr.is_join_interested());
417             assert!(curr.is_join_waker_set());
418 
419             if curr.is_complete() {
420                 return None;
421             }
422 
423             let mut next = curr;
424             next.unset_join_waker();
425 
426             Some(next)
427         })
428     }
429 
ref_inc(&self)430     pub(super) fn ref_inc(&self) {
431         use std::process;
432         use std::sync::atomic::Ordering::Relaxed;
433 
434         // Using a relaxed ordering is alright here, as knowledge of the
435         // original reference prevents other threads from erroneously deleting
436         // the object.
437         //
438         // As explained in the [Boost documentation][1], Increasing the
439         // reference counter can always be done with memory_order_relaxed: New
440         // references to an object can only be formed from an existing
441         // reference, and passing an existing reference from one thread to
442         // another must already provide any required synchronization.
443         //
444         // [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
445         let prev = self.val.fetch_add(REF_ONE, Relaxed);
446 
447         // If the reference count overflowed, abort.
448         if prev > isize::MAX as usize {
449             process::abort();
450         }
451     }
452 
453     /// Returns `true` if the task should be released.
ref_dec(&self) -> bool454     pub(super) fn ref_dec(&self) -> bool {
455         let prev = Snapshot(self.val.fetch_sub(REF_ONE, AcqRel));
456         assert!(prev.ref_count() >= 1);
457         prev.ref_count() == 1
458     }
459 
460     /// Returns `true` if the task should be released.
ref_dec_twice(&self) -> bool461     pub(super) fn ref_dec_twice(&self) -> bool {
462         let prev = Snapshot(self.val.fetch_sub(2 * REF_ONE, AcqRel));
463         assert!(prev.ref_count() >= 2);
464         prev.ref_count() == 2
465     }
466 
fetch_update_action<F, T>(&self, mut f: F) -> T where F: FnMut(Snapshot) -> (T, Option<Snapshot>),467     fn fetch_update_action<F, T>(&self, mut f: F) -> T
468     where
469         F: FnMut(Snapshot) -> (T, Option<Snapshot>),
470     {
471         let mut curr = self.load();
472 
473         loop {
474             let (output, next) = f(curr);
475             let next = match next {
476                 Some(next) => next,
477                 None => return output,
478             };
479 
480             let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
481 
482             match res {
483                 Ok(_) => return output,
484                 Err(actual) => curr = Snapshot(actual),
485             }
486         }
487     }
488 
fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot> where F: FnMut(Snapshot) -> Option<Snapshot>,489     fn fetch_update<F>(&self, mut f: F) -> Result<Snapshot, Snapshot>
490     where
491         F: FnMut(Snapshot) -> Option<Snapshot>,
492     {
493         let mut curr = self.load();
494 
495         loop {
496             let next = match f(curr) {
497                 Some(next) => next,
498                 None => return Err(curr),
499             };
500 
501             let res = self.val.compare_exchange(curr.0, next.0, AcqRel, Acquire);
502 
503             match res {
504                 Ok(_) => return Ok(next),
505                 Err(actual) => curr = Snapshot(actual),
506             }
507         }
508     }
509 }
510 
511 // ===== impl Snapshot =====
512 
513 impl Snapshot {
514     /// Returns `true` if the task is in an idle state.
is_idle(self) -> bool515     pub(super) fn is_idle(self) -> bool {
516         self.0 & (RUNNING | COMPLETE) == 0
517     }
518 
519     /// Returns `true` if the task has been flagged as notified.
is_notified(self) -> bool520     pub(super) fn is_notified(self) -> bool {
521         self.0 & NOTIFIED == NOTIFIED
522     }
523 
unset_notified(&mut self)524     fn unset_notified(&mut self) {
525         self.0 &= !NOTIFIED
526     }
527 
set_notified(&mut self)528     fn set_notified(&mut self) {
529         self.0 |= NOTIFIED
530     }
531 
is_running(self) -> bool532     pub(super) fn is_running(self) -> bool {
533         self.0 & RUNNING == RUNNING
534     }
535 
set_running(&mut self)536     fn set_running(&mut self) {
537         self.0 |= RUNNING;
538     }
539 
unset_running(&mut self)540     fn unset_running(&mut self) {
541         self.0 &= !RUNNING;
542     }
543 
is_cancelled(self) -> bool544     pub(super) fn is_cancelled(self) -> bool {
545         self.0 & CANCELLED == CANCELLED
546     }
547 
set_cancelled(&mut self)548     fn set_cancelled(&mut self) {
549         self.0 |= CANCELLED;
550     }
551 
552     /// Returns `true` if the task's future has completed execution.
is_complete(self) -> bool553     pub(super) fn is_complete(self) -> bool {
554         self.0 & COMPLETE == COMPLETE
555     }
556 
is_join_interested(self) -> bool557     pub(super) fn is_join_interested(self) -> bool {
558         self.0 & JOIN_INTEREST == JOIN_INTEREST
559     }
560 
unset_join_interested(&mut self)561     fn unset_join_interested(&mut self) {
562         self.0 &= !JOIN_INTEREST
563     }
564 
is_join_waker_set(self) -> bool565     pub(super) fn is_join_waker_set(self) -> bool {
566         self.0 & JOIN_WAKER == JOIN_WAKER
567     }
568 
set_join_waker(&mut self)569     fn set_join_waker(&mut self) {
570         self.0 |= JOIN_WAKER;
571     }
572 
unset_join_waker(&mut self)573     fn unset_join_waker(&mut self) {
574         self.0 &= !JOIN_WAKER
575     }
576 
ref_count(self) -> usize577     pub(super) fn ref_count(self) -> usize {
578         (self.0 & REF_COUNT_MASK) >> REF_COUNT_SHIFT
579     }
580 
ref_inc(&mut self)581     fn ref_inc(&mut self) {
582         assert!(self.0 <= isize::MAX as usize);
583         self.0 += REF_ONE;
584     }
585 
ref_dec(&mut self)586     pub(super) fn ref_dec(&mut self) {
587         assert!(self.ref_count() > 0);
588         self.0 -= REF_ONE
589     }
590 }
591 
592 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result593     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
594         let snapshot = self.load();
595         snapshot.fmt(fmt)
596     }
597 }
598 
599 impl fmt::Debug for Snapshot {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result600     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
601         fmt.debug_struct("Snapshot")
602             .field("is_running", &self.is_running())
603             .field("is_complete", &self.is_complete())
604             .field("is_notified", &self.is_notified())
605             .field("is_cancelled", &self.is_cancelled())
606             .field("is_join_interested", &self.is_join_interested())
607             .field("is_join_waker_set", &self.is_join_waker_set())
608             .field("ref_count", &self.ref_count())
609             .finish()
610     }
611 }
612