1 //! Inject queue used to send wakeups to a work-stealing scheduler
2
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Mutex;
5 use crate::runtime::task;
6
7 use std::marker::PhantomData;
8 use std::ptr::NonNull;
9 use std::sync::atomic::Ordering::{Acquire, Release};
10
11 /// Growable, MPMC queue used to inject new tasks into the scheduler and as an
12 /// overflow queue when the local, fixed-size, array queue overflows.
13 pub(crate) struct Inject<T: 'static> {
14 /// Pointers to the head and tail of the queue.
15 pointers: Mutex<Pointers>,
16
17 /// Number of pending tasks in the queue. This helps prevent unnecessary
18 /// locking in the hot path.
19 len: AtomicUsize,
20
21 _p: PhantomData<T>,
22 }
23
24 struct Pointers {
25 /// True if the queue is closed.
26 is_closed: bool,
27
28 /// Linked-list head.
29 head: Option<NonNull<task::Header>>,
30
31 /// Linked-list tail.
32 tail: Option<NonNull<task::Header>>,
33 }
34
35 unsafe impl<T> Send for Inject<T> {}
36 unsafe impl<T> Sync for Inject<T> {}
37
38 impl<T: 'static> Inject<T> {
new() -> Inject<T>39 pub(crate) fn new() -> Inject<T> {
40 Inject {
41 pointers: Mutex::new(Pointers {
42 is_closed: false,
43 head: None,
44 tail: None,
45 }),
46 len: AtomicUsize::new(0),
47 _p: PhantomData,
48 }
49 }
50
is_empty(&self) -> bool51 pub(crate) fn is_empty(&self) -> bool {
52 self.len() == 0
53 }
54
55 /// Closes the injection queue, returns `true` if the queue is open when the
56 /// transition is made.
close(&self) -> bool57 pub(crate) fn close(&self) -> bool {
58 let mut p = self.pointers.lock();
59
60 if p.is_closed {
61 return false;
62 }
63
64 p.is_closed = true;
65 true
66 }
67
is_closed(&self) -> bool68 pub(crate) fn is_closed(&self) -> bool {
69 self.pointers.lock().is_closed
70 }
71
len(&self) -> usize72 pub(crate) fn len(&self) -> usize {
73 self.len.load(Acquire)
74 }
75
76 /// Pushes a value into the queue.
77 ///
78 /// This does nothing if the queue is closed.
push(&self, task: task::Notified<T>)79 pub(crate) fn push(&self, task: task::Notified<T>) {
80 // Acquire queue lock
81 let mut p = self.pointers.lock();
82
83 if p.is_closed {
84 return;
85 }
86
87 // safety: only mutated with the lock held
88 let len = unsafe { self.len.unsync_load() };
89 let task = task.into_raw();
90
91 // The next pointer should already be null
92 debug_assert!(get_next(task).is_none());
93
94 if let Some(tail) = p.tail {
95 // safety: Holding the Notified for a task guarantees exclusive
96 // access to the `queue_next` field.
97 set_next(tail, Some(task));
98 } else {
99 p.head = Some(task);
100 }
101
102 p.tail = Some(task);
103
104 self.len.store(len + 1, Release);
105 }
106
107 /// Pushes several values into the queue.
108 #[inline]
push_batch<I>(&self, mut iter: I) where I: Iterator<Item = task::Notified<T>>,109 pub(crate) fn push_batch<I>(&self, mut iter: I)
110 where
111 I: Iterator<Item = task::Notified<T>>,
112 {
113 let first = match iter.next() {
114 Some(first) => first.into_raw(),
115 None => return,
116 };
117
118 // Link up all the tasks.
119 let mut prev = first;
120 let mut counter = 1;
121
122 // We are going to be called with an `std::iter::Chain`, and that
123 // iterator overrides `for_each` to something that is easier for the
124 // compiler to optimize than a loop.
125 iter.for_each(|next| {
126 let next = next.into_raw();
127
128 // safety: Holding the Notified for a task guarantees exclusive
129 // access to the `queue_next` field.
130 set_next(prev, Some(next));
131 prev = next;
132 counter += 1;
133 });
134
135 // Now that the tasks are linked together, insert them into the
136 // linked list.
137 self.push_batch_inner(first, prev, counter);
138 }
139
140 /// Inserts several tasks that have been linked together into the queue.
141 ///
142 /// The provided head and tail may be be the same task. In this case, a
143 /// single task is inserted.
144 #[inline]
push_batch_inner( &self, batch_head: NonNull<task::Header>, batch_tail: NonNull<task::Header>, num: usize, )145 fn push_batch_inner(
146 &self,
147 batch_head: NonNull<task::Header>,
148 batch_tail: NonNull<task::Header>,
149 num: usize,
150 ) {
151 debug_assert!(get_next(batch_tail).is_none());
152
153 let mut p = self.pointers.lock();
154
155 if let Some(tail) = p.tail {
156 set_next(tail, Some(batch_head));
157 } else {
158 p.head = Some(batch_head);
159 }
160
161 p.tail = Some(batch_tail);
162
163 // Increment the count.
164 //
165 // safety: All updates to the len atomic are guarded by the mutex. As
166 // such, a non-atomic load followed by a store is safe.
167 let len = unsafe { self.len.unsync_load() };
168
169 self.len.store(len + num, Release);
170 }
171
pop(&self) -> Option<task::Notified<T>>172 pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
173 // Fast path, if len == 0, then there are no values
174 if self.is_empty() {
175 return None;
176 }
177
178 let mut p = self.pointers.lock();
179
180 // It is possible to hit null here if another thread popped the last
181 // task between us checking `len` and acquiring the lock.
182 let task = p.head?;
183
184 p.head = get_next(task);
185
186 if p.head.is_none() {
187 p.tail = None;
188 }
189
190 set_next(task, None);
191
192 // Decrement the count.
193 //
194 // safety: All updates to the len atomic are guarded by the mutex. As
195 // such, a non-atomic load followed by a store is safe.
196 self.len
197 .store(unsafe { self.len.unsync_load() } - 1, Release);
198
199 // safety: a `Notified` is pushed into the queue and now it is popped!
200 Some(unsafe { task::Notified::from_raw(task) })
201 }
202 }
203
204 impl<T: 'static> Drop for Inject<T> {
drop(&mut self)205 fn drop(&mut self) {
206 if !std::thread::panicking() {
207 assert!(self.pop().is_none(), "queue not empty");
208 }
209 }
210 }
211
get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>>212 fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
213 unsafe { header.as_ref().queue_next.with(|ptr| *ptr) }
214 }
215
set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>)216 fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {
217 unsafe {
218 header.as_ref().set_next(val);
219 }
220 }
221