• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Inject queue used to send wakeups to a work-stealing scheduler
2 
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Mutex;
5 use crate::runtime::task;
6 
7 use std::marker::PhantomData;
8 use std::ptr::NonNull;
9 use std::sync::atomic::Ordering::{Acquire, Release};
10 
11 /// Growable, MPMC queue used to inject new tasks into the scheduler and as an
12 /// overflow queue when the local, fixed-size, array queue overflows.
13 pub(crate) struct Inject<T: 'static> {
14     /// Pointers to the head and tail of the queue.
15     pointers: Mutex<Pointers>,
16 
17     /// Number of pending tasks in the queue. This helps prevent unnecessary
18     /// locking in the hot path.
19     len: AtomicUsize,
20 
21     _p: PhantomData<T>,
22 }
23 
24 struct Pointers {
25     /// True if the queue is closed.
26     is_closed: bool,
27 
28     /// Linked-list head.
29     head: Option<NonNull<task::Header>>,
30 
31     /// Linked-list tail.
32     tail: Option<NonNull<task::Header>>,
33 }
34 
35 unsafe impl<T> Send for Inject<T> {}
36 unsafe impl<T> Sync for Inject<T> {}
37 
38 impl<T: 'static> Inject<T> {
new() -> Inject<T>39     pub(crate) fn new() -> Inject<T> {
40         Inject {
41             pointers: Mutex::new(Pointers {
42                 is_closed: false,
43                 head: None,
44                 tail: None,
45             }),
46             len: AtomicUsize::new(0),
47             _p: PhantomData,
48         }
49     }
50 
is_empty(&self) -> bool51     pub(crate) fn is_empty(&self) -> bool {
52         self.len() == 0
53     }
54 
55     /// Closes the injection queue, returns `true` if the queue is open when the
56     /// transition is made.
close(&self) -> bool57     pub(crate) fn close(&self) -> bool {
58         let mut p = self.pointers.lock();
59 
60         if p.is_closed {
61             return false;
62         }
63 
64         p.is_closed = true;
65         true
66     }
67 
is_closed(&self) -> bool68     pub(crate) fn is_closed(&self) -> bool {
69         self.pointers.lock().is_closed
70     }
71 
len(&self) -> usize72     pub(crate) fn len(&self) -> usize {
73         self.len.load(Acquire)
74     }
75 
76     /// Pushes a value into the queue.
77     ///
78     /// This does nothing if the queue is closed.
push(&self, task: task::Notified<T>)79     pub(crate) fn push(&self, task: task::Notified<T>) {
80         // Acquire queue lock
81         let mut p = self.pointers.lock();
82 
83         if p.is_closed {
84             return;
85         }
86 
87         // safety: only mutated with the lock held
88         let len = unsafe { self.len.unsync_load() };
89         let task = task.into_raw();
90 
91         // The next pointer should already be null
92         debug_assert!(get_next(task).is_none());
93 
94         if let Some(tail) = p.tail {
95             // safety: Holding the Notified for a task guarantees exclusive
96             // access to the `queue_next` field.
97             set_next(tail, Some(task));
98         } else {
99             p.head = Some(task);
100         }
101 
102         p.tail = Some(task);
103 
104         self.len.store(len + 1, Release);
105     }
106 
107     /// Pushes several values into the queue.
108     #[inline]
push_batch<I>(&self, mut iter: I) where I: Iterator<Item = task::Notified<T>>,109     pub(crate) fn push_batch<I>(&self, mut iter: I)
110     where
111         I: Iterator<Item = task::Notified<T>>,
112     {
113         let first = match iter.next() {
114             Some(first) => first.into_raw(),
115             None => return,
116         };
117 
118         // Link up all the tasks.
119         let mut prev = first;
120         let mut counter = 1;
121 
122         // We are going to be called with an `std::iter::Chain`, and that
123         // iterator overrides `for_each` to something that is easier for the
124         // compiler to optimize than a loop.
125         iter.for_each(|next| {
126             let next = next.into_raw();
127 
128             // safety: Holding the Notified for a task guarantees exclusive
129             // access to the `queue_next` field.
130             set_next(prev, Some(next));
131             prev = next;
132             counter += 1;
133         });
134 
135         // Now that the tasks are linked together, insert them into the
136         // linked list.
137         self.push_batch_inner(first, prev, counter);
138     }
139 
140     /// Inserts several tasks that have been linked together into the queue.
141     ///
142     /// The provided head and tail may be be the same task. In this case, a
143     /// single task is inserted.
144     #[inline]
push_batch_inner( &self, batch_head: NonNull<task::Header>, batch_tail: NonNull<task::Header>, num: usize, )145     fn push_batch_inner(
146         &self,
147         batch_head: NonNull<task::Header>,
148         batch_tail: NonNull<task::Header>,
149         num: usize,
150     ) {
151         debug_assert!(get_next(batch_tail).is_none());
152 
153         let mut p = self.pointers.lock();
154 
155         if let Some(tail) = p.tail {
156             set_next(tail, Some(batch_head));
157         } else {
158             p.head = Some(batch_head);
159         }
160 
161         p.tail = Some(batch_tail);
162 
163         // Increment the count.
164         //
165         // safety: All updates to the len atomic are guarded by the mutex. As
166         // such, a non-atomic load followed by a store is safe.
167         let len = unsafe { self.len.unsync_load() };
168 
169         self.len.store(len + num, Release);
170     }
171 
pop(&self) -> Option<task::Notified<T>>172     pub(crate) fn pop(&self) -> Option<task::Notified<T>> {
173         // Fast path, if len == 0, then there are no values
174         if self.is_empty() {
175             return None;
176         }
177 
178         let mut p = self.pointers.lock();
179 
180         // It is possible to hit null here if another thread popped the last
181         // task between us checking `len` and acquiring the lock.
182         let task = p.head?;
183 
184         p.head = get_next(task);
185 
186         if p.head.is_none() {
187             p.tail = None;
188         }
189 
190         set_next(task, None);
191 
192         // Decrement the count.
193         //
194         // safety: All updates to the len atomic are guarded by the mutex. As
195         // such, a non-atomic load followed by a store is safe.
196         self.len
197             .store(unsafe { self.len.unsync_load() } - 1, Release);
198 
199         // safety: a `Notified` is pushed into the queue and now it is popped!
200         Some(unsafe { task::Notified::from_raw(task) })
201     }
202 }
203 
204 impl<T: 'static> Drop for Inject<T> {
drop(&mut self)205     fn drop(&mut self) {
206         if !std::thread::panicking() {
207             assert!(self.pop().is_none(), "queue not empty");
208         }
209     }
210 }
211 
get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>>212 fn get_next(header: NonNull<task::Header>) -> Option<NonNull<task::Header>> {
213     unsafe { header.as_ref().queue_next.with(|ptr| *ptr) }
214 }
215 
set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>)216 fn set_next(header: NonNull<task::Header>, val: Option<NonNull<task::Header>>) {
217     unsafe {
218         header.as_ref().set_next(val);
219     }
220 }
221