1 use std::cell::UnsafeCell; 2 use std::sync::atomic::AtomicUsize; 3 use std::sync::atomic::Ordering::SeqCst; 4 5 /// A "lock" around data `D`, which employs a *helping* strategy. 6 /// 7 /// Used to ensure that concurrent `unpark` invocations lead to (1) `poll` being 8 /// invoked on only a single thread at a time (2) `poll` being invoked at least 9 /// once after each `unpark` (unless the future has completed). 10 pub(crate) struct UnparkMutex<D> { 11 // The state of task execution (state machine described below) 12 status: AtomicUsize, 13 14 // The actual task data, accessible only in the POLLING state 15 inner: UnsafeCell<Option<D>>, 16 } 17 18 // `UnparkMutex<D>` functions in many ways like a `Mutex<D>`, except that on 19 // acquisition failure, the current lock holder performs the desired work -- 20 // re-polling. 21 // 22 // As such, these impls mirror those for `Mutex<D>`. In particular, a reference 23 // to `UnparkMutex` can be used to gain `&mut` access to the inner data, which 24 // must therefore be `Send`. 25 unsafe impl<D: Send> Send for UnparkMutex<D> {} 26 unsafe impl<D: Send> Sync for UnparkMutex<D> {} 27 28 // There are four possible task states, listed below with their possible 29 // transitions: 30 31 // The task is blocked, waiting on an event 32 const WAITING: usize = 0; // --> POLLING 33 34 // The task is actively being polled by a thread; arrival of additional events 35 // of interest should move it to the REPOLL state 36 const POLLING: usize = 1; // --> WAITING, REPOLL, or COMPLETE 37 38 // The task is actively being polled, but will need to be re-polled upon 39 // completion to ensure that all events were observed. 40 const REPOLL: usize = 2; // --> POLLING 41 42 // The task has finished executing (either successfully or with an error/panic) 43 const COMPLETE: usize = 3; // No transitions out 44 45 impl<D> UnparkMutex<D> { new() -> Self46 pub(crate) fn new() -> Self { 47 Self { 48 status: AtomicUsize::new(WAITING), 49 inner: UnsafeCell::new(None), 50 } 51 } 52 53 /// Attempt to "notify" the mutex that a poll should occur. 54 /// 55 /// An `Ok` result indicates that the `POLLING` state has been entered, and 56 /// the caller can proceed to poll the future. An `Err` result indicates 57 /// that polling is not necessary (because the task is finished or the 58 /// polling has been delegated). notify(&self) -> Result<D, ()>59 pub(crate) fn notify(&self) -> Result<D, ()> { 60 let mut status = self.status.load(SeqCst); 61 loop { 62 match status { 63 // The task is idle, so try to run it immediately. 64 WAITING => { 65 match self.status.compare_exchange(WAITING, POLLING, 66 SeqCst, SeqCst) { 67 Ok(_) => { 68 let data = unsafe { 69 // SAFETY: we've ensured mutual exclusion via 70 // the status protocol; we are the only thread 71 // that has transitioned to the POLLING state, 72 // and we won't transition back to QUEUED until 73 // the lock is "released" by this thread. See 74 // the protocol diagram above. 75 (*self.inner.get()).take().unwrap() 76 }; 77 return Ok(data); 78 } 79 Err(cur) => status = cur, 80 } 81 } 82 83 // The task is being polled, so we need to record that it should 84 // be *repolled* when complete. 85 POLLING => { 86 match self.status.compare_exchange(POLLING, REPOLL, 87 SeqCst, SeqCst) { 88 Ok(_) => return Err(()), 89 Err(cur) => status = cur, 90 } 91 } 92 93 // The task is already scheduled for polling, or is complete, so 94 // we've got nothing to do. 95 _ => return Err(()), 96 } 97 } 98 } 99 100 /// Alert the mutex that polling is about to begin, clearing any accumulated 101 /// re-poll requests. 102 /// 103 /// # Safety 104 /// 105 /// Callable only from the `POLLING`/`REPOLL` states, i.e. between 106 /// successful calls to `notify` and `wait`/`complete`. start_poll(&self)107 pub(crate) unsafe fn start_poll(&self) { 108 self.status.store(POLLING, SeqCst); 109 } 110 111 /// Alert the mutex that polling completed with `Pending`. 112 /// 113 /// # Safety 114 /// 115 /// Callable only from the `POLLING`/`REPOLL` states, i.e. between 116 /// successful calls to `notify` and `wait`/`complete`. wait(&self, data: D) -> Result<(), D>117 pub(crate) unsafe fn wait(&self, data: D) -> Result<(), D> { 118 *self.inner.get() = Some(data); 119 120 match self.status.compare_exchange(POLLING, WAITING, SeqCst, SeqCst) { 121 // no unparks came in while we were running 122 Ok(_) => Ok(()), 123 124 // guaranteed to be in REPOLL state; just clobber the 125 // state and run again. 126 Err(status) => { 127 assert_eq!(status, REPOLL); 128 self.status.store(POLLING, SeqCst); 129 Err((*self.inner.get()).take().unwrap()) 130 } 131 } 132 } 133 134 /// Alert the mutex that the task has completed execution and should not be 135 /// notified again. 136 /// 137 /// # Safety 138 /// 139 /// Callable only from the `POLLING`/`REPOLL` states, i.e. between 140 /// successful calls to `notify` and `wait`/`complete`. complete(&self)141 pub(crate) unsafe fn complete(&self) { 142 self.status.store(COMPLETE, SeqCst); 143 } 144 } 145