• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use std::cell::UnsafeCell;
2 use std::sync::atomic::AtomicUsize;
3 use std::sync::atomic::Ordering::SeqCst;
4 
5 /// A "lock" around data `D`, which employs a *helping* strategy.
6 ///
7 /// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being
8 /// invoked on only a single thread at a time (2) `poll` being invoked at least
9 /// once after each `unpark` (unless the future has completed).
10 pub(crate) struct UnparkMutex<D> {
11     // The state of task execution (state machine described below)
12     status: AtomicUsize,
13 
14     // The actual task data, accessible only in the POLLING state
15     inner: UnsafeCell<Option<D>>,
16 }
17 
18 // `UnparkMutex<D>` functions in many ways like a `Mutex<D>`, except that on
19 // acquisition failure, the current lock holder performs the desired work --
20 // re-polling.
21 //
22 // As such, these impls mirror those for `Mutex<D>`. In particular, a reference
23 // to `UnparkMutex` can be used to gain `&mut` access to the inner data, which
24 // must therefore be `Send`.
25 unsafe impl<D: Send> Send for UnparkMutex<D> {}
26 unsafe impl<D: Send> Sync for UnparkMutex<D> {}
27 
28 // There are four possible task states, listed below with their possible
29 // transitions:
30 
31 // The task is blocked, waiting on an event
32 const WAITING: usize = 0;       // --> POLLING
33 
34 // The task is actively being polled by a thread; arrival of additional events
35 // of interest should move it to the REPOLL state
36 const POLLING: usize = 1;       // --> WAITING, REPOLL, or COMPLETE
37 
38 // The task is actively being polled, but will need to be re-polled upon
39 // completion to ensure that all events were observed.
40 const REPOLL: usize = 2;        // --> POLLING
41 
42 // The task has finished executing (either successfully or with an error/panic)
43 const COMPLETE: usize = 3;      // No transitions out
44 
45 impl<D> UnparkMutex<D> {
new() -> Self46     pub(crate) fn new() -> Self {
47         Self {
48             status: AtomicUsize::new(WAITING),
49             inner: UnsafeCell::new(None),
50         }
51     }
52 
53     /// Attempt to "notify" the mutex that a poll should occur.
54     ///
55     /// An `Ok` result indicates that the `POLLING` state has been entered, and
56     /// the caller can proceed to poll the future. An `Err` result indicates
57     /// that polling is not necessary (because the task is finished or the
58     /// polling has been delegated).
notify(&self) -> Result<D, ()>59     pub(crate) fn notify(&self) -> Result<D, ()> {
60         let mut status = self.status.load(SeqCst);
61         loop {
62             match status {
63                 // The task is idle, so try to run it immediately.
64                 WAITING => {
65                     match self.status.compare_exchange(WAITING, POLLING,
66                                                        SeqCst, SeqCst) {
67                         Ok(_) => {
68                             let data = unsafe {
69                                 // SAFETY: we've ensured mutual exclusion via
70                                 // the status protocol; we are the only thread
71                                 // that has transitioned to the POLLING state,
72                                 // and we won't transition back to QUEUED until
73                                 // the lock is "released" by this thread. See
74                                 // the protocol diagram above.
75                                 (*self.inner.get()).take().unwrap()
76                             };
77                             return Ok(data);
78                         }
79                         Err(cur) => status = cur,
80                     }
81                 }
82 
83                 // The task is being polled, so we need to record that it should
84                 // be *repolled* when complete.
85                 POLLING => {
86                     match self.status.compare_exchange(POLLING, REPOLL,
87                                                        SeqCst, SeqCst) {
88                         Ok(_) => return Err(()),
89                         Err(cur) => status = cur,
90                     }
91                 }
92 
93                 // The task is already scheduled for polling, or is complete, so
94                 // we've got nothing to do.
95                 _ => return Err(()),
96             }
97         }
98     }
99 
100     /// Alert the mutex that polling is about to begin, clearing any accumulated
101     /// re-poll requests.
102     ///
103     /// # Safety
104     ///
105     /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
106     /// successful calls to `notify` and `wait`/`complete`.
start_poll(&self)107     pub(crate) unsafe fn start_poll(&self) {
108         self.status.store(POLLING, SeqCst);
109     }
110 
111     /// Alert the mutex that polling completed with `Pending`.
112     ///
113     /// # Safety
114     ///
115     /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
116     /// successful calls to `notify` and `wait`/`complete`.
wait(&self, data: D) -> Result<(), D>117     pub(crate) unsafe fn wait(&self, data: D) -> Result<(), D> {
118         *self.inner.get() = Some(data);
119 
120         match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) {
121             // no unparks came in while we were running
122             Ok(_) => Ok(()),
123 
124             // guaranteed to be in REPOLL state; just clobber the
125             // state and run again.
126             Err(status) => {
127                 assert_eq!(status, REPOLL);
128                 self.status.store(POLLING, SeqCst);
129                 Err((*self.inner.get()).take().unwrap())
130             }
131         }
132     }
133 
134     /// Alert the mutex that the task has completed execution and should not be
135     /// notified again.
136     ///
137     /// # Safety
138     ///
139     /// Callable only from the `POLLING`/`REPOLL` states, i.e. between
140     /// successful calls to `notify` and `wait`/`complete`.
complete(&self)141     pub(crate) unsafe fn complete(&self) {
142         self.status.store(COMPLETE, SeqCst);
143     }
144 }
145