• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::UnsafeCell;
6 use std::future::Future;
7 use std::mem;
8 use std::pin::Pin;
9 use std::ptr::NonNull;
10 use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
11 use std::sync::Arc;
12 use std::task::{Context, Poll, Waker};
13 
14 use intrusive_collections::linked_list::{LinkedList, LinkedListOps};
15 use intrusive_collections::{intrusive_adapter, DefaultLinkOps, LinkOps};
16 
17 use crate::sync::SpinLock;
18 
19 // An atomic version of a LinkedListLink. See https://github.com/Amanieu/intrusive-rs/issues/47 for
20 // more details.
21 pub struct AtomicLink {
22     prev: UnsafeCell<Option<NonNull<AtomicLink>>>,
23     next: UnsafeCell<Option<NonNull<AtomicLink>>>,
24     linked: AtomicBool,
25 }
26 
27 impl AtomicLink {
new() -> AtomicLink28     fn new() -> AtomicLink {
29         AtomicLink {
30             linked: AtomicBool::new(false),
31             prev: UnsafeCell::new(None),
32             next: UnsafeCell::new(None),
33         }
34     }
35 
is_linked(&self) -> bool36     fn is_linked(&self) -> bool {
37         self.linked.load(Ordering::Relaxed)
38     }
39 }
40 
41 impl DefaultLinkOps for AtomicLink {
42     type Ops = AtomicLinkOps;
43 
44     const NEW: Self::Ops = AtomicLinkOps;
45 }
46 
47 // Safe because the only way to mutate `AtomicLink` is via the `LinkedListOps` trait whose methods
48 // are all unsafe and require that the caller has first called `acquire_link` (and had it return
49 // true) to use them safely.
50 unsafe impl Send for AtomicLink {}
51 unsafe impl Sync for AtomicLink {}
52 
53 #[derive(Copy, Clone, Default)]
54 pub struct AtomicLinkOps;
55 
56 unsafe impl LinkOps for AtomicLinkOps {
57     type LinkPtr = NonNull<AtomicLink>;
58 
acquire_link(&mut self, ptr: Self::LinkPtr) -> bool59     unsafe fn acquire_link(&mut self, ptr: Self::LinkPtr) -> bool {
60         !ptr.as_ref().linked.swap(true, Ordering::Acquire)
61     }
62 
release_link(&mut self, ptr: Self::LinkPtr)63     unsafe fn release_link(&mut self, ptr: Self::LinkPtr) {
64         ptr.as_ref().linked.store(false, Ordering::Release)
65     }
66 }
67 
68 unsafe impl LinkedListOps for AtomicLinkOps {
next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr>69     unsafe fn next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> {
70         *ptr.as_ref().next.get()
71     }
72 
prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr>73     unsafe fn prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> {
74         *ptr.as_ref().prev.get()
75     }
76 
set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>)77     unsafe fn set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>) {
78         *ptr.as_ref().next.get() = next;
79     }
80 
set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>)81     unsafe fn set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>) {
82         *ptr.as_ref().prev.get() = prev;
83     }
84 }
85 
86 #[derive(Clone, Copy)]
87 pub enum Kind {
88     Shared,
89     Exclusive,
90 }
91 
92 enum State {
93     Init,
94     Waiting(Waker),
95     Woken,
96     Finished,
97     Processing,
98 }
99 
100 // Indicates the queue to which the waiter belongs. It is the responsibility of the Mutex and
101 // Condvar implementations to update this value when adding/removing a Waiter from their respective
102 // waiter lists.
103 #[repr(u8)]
104 #[derive(Debug, Eq, PartialEq)]
105 pub enum WaitingFor {
106     // The waiter is either not linked into  a waiter list or it is linked into a temporary list.
107     None = 0,
108     // The waiter is linked into the Mutex's waiter list.
109     Mutex = 1,
110     // The waiter is linked into the Condvar's waiter list.
111     Condvar = 2,
112 }
113 
114 // Internal struct used to keep track of the cancellation function.
115 struct Cancel {
116     c: fn(usize, &Waiter, bool) -> bool,
117     data: usize,
118 }
119 
120 // Represents a thread currently blocked on a Condvar or on acquiring a Mutex.
121 pub struct Waiter {
122     link: AtomicLink,
123     state: SpinLock<State>,
124     cancel: SpinLock<Cancel>,
125     kind: Kind,
126     waiting_for: AtomicU8,
127 }
128 
129 impl Waiter {
130     // Create a new, initialized Waiter.
131     //
132     // `kind` should indicate whether this waiter represent a thread that is waiting for a shared
133     // lock or an exclusive lock.
134     //
135     // `cancel` is the function that is called when a `WaitFuture` (returned by the `wait()`
136     // function) is dropped before it can complete. `cancel_data` is used as the first parameter of
137     // the `cancel` function. The second parameter is the `Waiter` that was canceled and the third
138     // parameter indicates whether the `WaitFuture` was dropped after it was woken (but before it
139     // was polled to completion). The `cancel` function should return true if it was able to
140     // successfully process the cancellation. One reason why a `cancel` function may return false is
141     // if the `Waiter` was transferred to a different waiter list after the cancel function was
142     // called but before it was able to run. In this case, it is expected that the new waiter list
143     // updated the cancel function (by calling `set_cancel`) and the cancellation will be retried by
144     // fetching and calling the new cancellation function.
145     //
146     // `waiting_for` indicates the waiter list to which this `Waiter` will be added. See the
147     // documentation of the `WaitingFor` enum for the meaning of the different values.
new( kind: Kind, cancel: fn(usize, &Waiter, bool) -> bool, cancel_data: usize, waiting_for: WaitingFor, ) -> Waiter148     pub fn new(
149         kind: Kind,
150         cancel: fn(usize, &Waiter, bool) -> bool,
151         cancel_data: usize,
152         waiting_for: WaitingFor,
153     ) -> Waiter {
154         Waiter {
155             link: AtomicLink::new(),
156             state: SpinLock::new(State::Init),
157             cancel: SpinLock::new(Cancel {
158                 c: cancel,
159                 data: cancel_data,
160             }),
161             kind,
162             waiting_for: AtomicU8::new(waiting_for as u8),
163         }
164     }
165 
166     // The kind of lock that this `Waiter` is waiting to acquire.
kind(&self) -> Kind167     pub fn kind(&self) -> Kind {
168         self.kind
169     }
170 
171     // Returns true if this `Waiter` is currently linked into a waiter list.
is_linked(&self) -> bool172     pub fn is_linked(&self) -> bool {
173         self.link.is_linked()
174     }
175 
176     // Indicates the waiter list to which this `Waiter` belongs.
is_waiting_for(&self) -> WaitingFor177     pub fn is_waiting_for(&self) -> WaitingFor {
178         match self.waiting_for.load(Ordering::Acquire) {
179             0 => WaitingFor::None,
180             1 => WaitingFor::Mutex,
181             2 => WaitingFor::Condvar,
182             v => panic!("Unknown value for `WaitingFor`: {}", v),
183         }
184     }
185 
186     // Change the waiter list to which this `Waiter` belongs. This will panic if called when the
187     // `Waiter` is still linked into a waiter list.
set_waiting_for(&self, waiting_for: WaitingFor)188     pub fn set_waiting_for(&self, waiting_for: WaitingFor) {
189         self.waiting_for.store(waiting_for as u8, Ordering::Release);
190     }
191 
192     // Change the cancellation function that this `Waiter` should use. This will panic if called
193     // when the `Waiter` is still linked into a waiter list.
set_cancel(&self, c: fn(usize, &Waiter, bool) -> bool, data: usize)194     pub fn set_cancel(&self, c: fn(usize, &Waiter, bool) -> bool, data: usize) {
195         debug_assert!(
196             !self.is_linked(),
197             "Cannot change cancellation function while linked"
198         );
199         let mut cancel = self.cancel.lock();
200         cancel.c = c;
201         cancel.data = data;
202     }
203 
204     // Reset the Waiter back to its initial state. Panics if this `Waiter` is still linked into a
205     // waiter list.
reset(&self, waiting_for: WaitingFor)206     pub fn reset(&self, waiting_for: WaitingFor) {
207         debug_assert!(!self.is_linked(), "Cannot reset `Waiter` while linked");
208         self.set_waiting_for(waiting_for);
209 
210         let mut state = self.state.lock();
211         if let State::Waiting(waker) = mem::replace(&mut *state, State::Init) {
212             mem::drop(state);
213             mem::drop(waker);
214         }
215     }
216 
217     // Wait until woken up by another thread.
wait(&self) -> WaitFuture<'_>218     pub fn wait(&self) -> WaitFuture<'_> {
219         WaitFuture { waiter: self }
220     }
221 
222     // Wake up the thread associated with this `Waiter`. Panics if `waiting_for()` does not return
223     // `WaitingFor::None` or if `is_linked()` returns true.
wake(&self)224     pub fn wake(&self) {
225         debug_assert!(!self.is_linked(), "Cannot wake `Waiter` while linked");
226         debug_assert_eq!(self.is_waiting_for(), WaitingFor::None);
227 
228         let mut state = self.state.lock();
229 
230         if let State::Waiting(waker) = mem::replace(&mut *state, State::Woken) {
231             mem::drop(state);
232             waker.wake();
233         }
234     }
235 }
236 
237 pub struct WaitFuture<'w> {
238     waiter: &'w Waiter,
239 }
240 
241 impl<'w> Future for WaitFuture<'w> {
242     type Output = ();
243 
poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>244     fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
245         let mut state = self.waiter.state.lock();
246 
247         match mem::replace(&mut *state, State::Processing) {
248             State::Init => {
249                 *state = State::Waiting(cx.waker().clone());
250 
251                 Poll::Pending
252             }
253             State::Waiting(old_waker) => {
254                 *state = State::Waiting(cx.waker().clone());
255                 mem::drop(state);
256                 mem::drop(old_waker);
257 
258                 Poll::Pending
259             }
260             State::Woken => {
261                 *state = State::Finished;
262                 Poll::Ready(())
263             }
264             State::Finished => {
265                 panic!("Future polled after returning Poll::Ready");
266             }
267             State::Processing => {
268                 panic!("Unexpected waker state");
269             }
270         }
271     }
272 }
273 
274 impl<'w> Drop for WaitFuture<'w> {
drop(&mut self)275     fn drop(&mut self) {
276         let state = self.waiter.state.lock();
277 
278         match *state {
279             State::Finished => {}
280             State::Processing => panic!("Unexpected waker state"),
281             State::Woken => {
282                 mem::drop(state);
283 
284                 // We were woken but not polled.  Wake up the next waiter.
285                 let mut success = false;
286                 while !success {
287                     let cancel = self.waiter.cancel.lock();
288                     let c = cancel.c;
289                     let data = cancel.data;
290 
291                     mem::drop(cancel);
292 
293                     success = c(data, self.waiter, true);
294                 }
295             }
296             _ => {
297                 mem::drop(state);
298 
299                 // Not woken.  No need to wake up any waiters.
300                 let mut success = false;
301                 while !success {
302                     let cancel = self.waiter.cancel.lock();
303                     let c = cancel.c;
304                     let data = cancel.data;
305 
306                     mem::drop(cancel);
307 
308                     success = c(data, self.waiter, false);
309                 }
310             }
311         }
312     }
313 }
314 
315 intrusive_adapter!(pub WaiterAdapter = Arc<Waiter>: Waiter { link: AtomicLink });
316 
317 pub type WaiterList = LinkedList<WaiterAdapter>;
318