• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Chromium OS Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4 
5 use std::cell::UnsafeCell;
6 use std::future::Future;
7 use std::mem;
8 use std::pin::Pin;
9 use std::ptr::NonNull;
10 use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
11 use std::sync::Arc;
12 use std::task::{Context, Poll, Waker};
13 
14 use intrusive_collections::linked_list::{LinkedList, LinkedListOps};
15 use intrusive_collections::{intrusive_adapter, DefaultLinkOps, LinkOps};
16 
17 use crate::sync::SpinLock;
18 
19 // An atomic version of a LinkedListLink. See https://github.com/Amanieu/intrusive-rs/issues/47 for
20 // more details.
21 #[repr(align(128))]
22 pub struct AtomicLink {
23     prev: UnsafeCell<Option<NonNull<AtomicLink>>>,
24     next: UnsafeCell<Option<NonNull<AtomicLink>>>,
25     linked: AtomicBool,
26 }
27 
28 impl AtomicLink {
new() -> AtomicLink29     fn new() -> AtomicLink {
30         AtomicLink {
31             linked: AtomicBool::new(false),
32             prev: UnsafeCell::new(None),
33             next: UnsafeCell::new(None),
34         }
35     }
36 
is_linked(&self) -> bool37     fn is_linked(&self) -> bool {
38         self.linked.load(Ordering::Relaxed)
39     }
40 }
41 
42 impl DefaultLinkOps for AtomicLink {
43     type Ops = AtomicLinkOps;
44 
45     const NEW: Self::Ops = AtomicLinkOps;
46 }
47 
48 // Safe because the only way to mutate `AtomicLink` is via the `LinkedListOps` trait whose methods
49 // are all unsafe and require that the caller has first called `acquire_link` (and had it return
50 // true) to use them safely.
51 unsafe impl Send for AtomicLink {}
52 unsafe impl Sync for AtomicLink {}
53 
54 #[derive(Copy, Clone, Default)]
55 pub struct AtomicLinkOps;
56 
57 unsafe impl LinkOps for AtomicLinkOps {
58     type LinkPtr = NonNull<AtomicLink>;
59 
acquire_link(&mut self, ptr: Self::LinkPtr) -> bool60     unsafe fn acquire_link(&mut self, ptr: Self::LinkPtr) -> bool {
61         !ptr.as_ref().linked.swap(true, Ordering::Acquire)
62     }
63 
release_link(&mut self, ptr: Self::LinkPtr)64     unsafe fn release_link(&mut self, ptr: Self::LinkPtr) {
65         ptr.as_ref().linked.store(false, Ordering::Release)
66     }
67 }
68 
69 unsafe impl LinkedListOps for AtomicLinkOps {
next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr>70     unsafe fn next(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> {
71         *ptr.as_ref().next.get()
72     }
73 
prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr>74     unsafe fn prev(&self, ptr: Self::LinkPtr) -> Option<Self::LinkPtr> {
75         *ptr.as_ref().prev.get()
76     }
77 
set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>)78     unsafe fn set_next(&mut self, ptr: Self::LinkPtr, next: Option<Self::LinkPtr>) {
79         *ptr.as_ref().next.get() = next;
80     }
81 
set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>)82     unsafe fn set_prev(&mut self, ptr: Self::LinkPtr, prev: Option<Self::LinkPtr>) {
83         *ptr.as_ref().prev.get() = prev;
84     }
85 }
86 
87 #[derive(Clone, Copy)]
88 pub enum Kind {
89     Shared,
90     Exclusive,
91 }
92 
93 enum State {
94     Init,
95     Waiting(Waker),
96     Woken,
97     Finished,
98     Processing,
99 }
100 
101 // Indicates the queue to which the waiter belongs. It is the responsibility of the Mutex and
102 // Condvar implementations to update this value when adding/removing a Waiter from their respective
103 // waiter lists.
104 #[repr(u8)]
105 #[derive(Debug, Eq, PartialEq)]
106 pub enum WaitingFor {
107     // The waiter is either not linked into  a waiter list or it is linked into a temporary list.
108     None = 0,
109     // The waiter is linked into the Mutex's waiter list.
110     Mutex = 1,
111     // The waiter is linked into the Condvar's waiter list.
112     Condvar = 2,
113 }
114 
115 // Represents a thread currently blocked on a Condvar or on acquiring a Mutex.
116 pub struct Waiter {
117     link: AtomicLink,
118     state: SpinLock<State>,
119     cancel: fn(usize, &Waiter, bool),
120     cancel_data: usize,
121     kind: Kind,
122     waiting_for: AtomicU8,
123 }
124 
125 impl Waiter {
126     // Create a new, initialized Waiter.
127     //
128     // `kind` should indicate whether this waiter represent a thread that is waiting for a shared
129     // lock or an exclusive lock.
130     //
131     // `cancel` is the function that is called when a `WaitFuture` (returned by the `wait()`
132     // function) is dropped before it can complete. `cancel_data` is used as the first parameter of
133     // the `cancel` function. The second parameter is the `Waiter` that was canceled and the third
134     // parameter indicates whether the `WaitFuture` was dropped after it was woken (but before it
135     // was polled to completion). A value of `false` for the third parameter may already be stale
136     // by the time the cancel function runs and so does not guarantee that the waiter was not woken.
137     // In this case, implementations should still check if the Waiter was woken. However, a value of
138     // `true` guarantees that the waiter was already woken up so no additional checks are necessary.
139     // In this case, the cancel implementation should wake up the next waiter in its wait list, if
140     // any.
141     //
142     // `waiting_for` indicates the waiter list to which this `Waiter` will be added. See the
143     // documentation of the `WaitingFor` enum for the meaning of the different values.
new( kind: Kind, cancel: fn(usize, &Waiter, bool), cancel_data: usize, waiting_for: WaitingFor, ) -> Waiter144     pub fn new(
145         kind: Kind,
146         cancel: fn(usize, &Waiter, bool),
147         cancel_data: usize,
148         waiting_for: WaitingFor,
149     ) -> Waiter {
150         Waiter {
151             link: AtomicLink::new(),
152             state: SpinLock::new(State::Init),
153             cancel,
154             cancel_data,
155             kind,
156             waiting_for: AtomicU8::new(waiting_for as u8),
157         }
158     }
159 
160     // The kind of lock that this `Waiter` is waiting to acquire.
kind(&self) -> Kind161     pub fn kind(&self) -> Kind {
162         self.kind
163     }
164 
165     // Returns true if this `Waiter` is currently linked into a waiter list.
is_linked(&self) -> bool166     pub fn is_linked(&self) -> bool {
167         self.link.is_linked()
168     }
169 
170     // Indicates the waiter list to which this `Waiter` belongs.
is_waiting_for(&self) -> WaitingFor171     pub fn is_waiting_for(&self) -> WaitingFor {
172         match self.waiting_for.load(Ordering::Acquire) {
173             0 => WaitingFor::None,
174             1 => WaitingFor::Mutex,
175             2 => WaitingFor::Condvar,
176             v => panic!("Unknown value for `WaitingFor`: {}", v),
177         }
178     }
179 
180     // Change the waiter list to which this `Waiter` belongs. This will panic if called when the
181     // `Waiter` is still linked into a waiter list.
set_waiting_for(&self, waiting_for: WaitingFor)182     pub fn set_waiting_for(&self, waiting_for: WaitingFor) {
183         self.waiting_for.store(waiting_for as u8, Ordering::Release);
184     }
185 
186     // Reset the Waiter back to its initial state. Panics if this `Waiter` is still linked into a
187     // waiter list.
reset(&self, waiting_for: WaitingFor)188     pub fn reset(&self, waiting_for: WaitingFor) {
189         debug_assert!(!self.is_linked(), "Cannot reset `Waiter` while linked");
190         self.set_waiting_for(waiting_for);
191 
192         let mut state = self.state.lock();
193         if let State::Waiting(waker) = mem::replace(&mut *state, State::Init) {
194             mem::drop(state);
195             mem::drop(waker);
196         }
197     }
198 
199     // Wait until woken up by another thread.
wait(&self) -> WaitFuture<'_>200     pub fn wait(&self) -> WaitFuture<'_> {
201         WaitFuture { waiter: self }
202     }
203 
204     // Wake up the thread associated with this `Waiter`. Panics if `waiting_for()` does not return
205     // `WaitingFor::None` or if `is_linked()` returns true.
wake(&self)206     pub fn wake(&self) {
207         debug_assert!(!self.is_linked(), "Cannot wake `Waiter` while linked");
208         debug_assert_eq!(self.is_waiting_for(), WaitingFor::None);
209 
210         let mut state = self.state.lock();
211 
212         if let State::Waiting(waker) = mem::replace(&mut *state, State::Woken) {
213             mem::drop(state);
214             waker.wake();
215         }
216     }
217 }
218 
219 pub struct WaitFuture<'w> {
220     waiter: &'w Waiter,
221 }
222 
223 impl<'w> Future for WaitFuture<'w> {
224     type Output = ();
225 
poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>226     fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
227         let mut state = self.waiter.state.lock();
228 
229         match mem::replace(&mut *state, State::Processing) {
230             State::Init => {
231                 *state = State::Waiting(cx.waker().clone());
232 
233                 Poll::Pending
234             }
235             State::Waiting(old_waker) => {
236                 *state = State::Waiting(cx.waker().clone());
237                 mem::drop(state);
238                 mem::drop(old_waker);
239 
240                 Poll::Pending
241             }
242             State::Woken => {
243                 *state = State::Finished;
244                 Poll::Ready(())
245             }
246             State::Finished => {
247                 panic!("Future polled after returning Poll::Ready");
248             }
249             State::Processing => {
250                 panic!("Unexpected waker state");
251             }
252         }
253     }
254 }
255 
256 impl<'w> Drop for WaitFuture<'w> {
drop(&mut self)257     fn drop(&mut self) {
258         let state = self.waiter.state.lock();
259 
260         match *state {
261             State::Finished => {}
262             State::Processing => panic!("Unexpected waker state"),
263             State::Woken => {
264                 mem::drop(state);
265 
266                 // We were woken but not polled.  Wake up the next waiter.
267                 (self.waiter.cancel)(self.waiter.cancel_data, self.waiter, true);
268             }
269             _ => {
270                 mem::drop(state);
271 
272                 // Not woken.  No need to wake up any waiters.
273                 (self.waiter.cancel)(self.waiter.cancel_data, self.waiter, false);
274             }
275         }
276     }
277 }
278 
279 intrusive_adapter!(pub WaiterAdapter = Arc<Waiter>: Waiter { link: AtomicLink });
280 
281 pub type WaiterList = LinkedList<WaiterAdapter>;
282