1 //! Run-queue structures to support a work-stealing scheduler
2
3 use crate::loom::cell::UnsafeCell;
4 use crate::loom::sync::Arc;
5 use crate::runtime::scheduler::multi_thread_alt::{Overflow, Stats};
6 use crate::runtime::task;
7
8 use std::mem::{self, MaybeUninit};
9 use std::ptr;
10 use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
11
12 // Use wider integers when possible to increase ABA resilience.
13 //
14 // See issue #5041: <https://github.com/tokio-rs/tokio/issues/5041>.
15 cfg_has_atomic_u64! {
16 type UnsignedShort = u32;
17 type UnsignedLong = u64;
18 type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU32;
19 type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU64;
20 }
21 cfg_not_has_atomic_u64! {
22 type UnsignedShort = u16;
23 type UnsignedLong = u32;
24 type AtomicUnsignedShort = crate::loom::sync::atomic::AtomicU16;
25 type AtomicUnsignedLong = crate::loom::sync::atomic::AtomicU32;
26 }
27
28 /// Producer handle. May only be used from a single thread.
29 pub(crate) struct Local<T: 'static> {
30 inner: Arc<Inner<T>>,
31 }
32
33 /// Consumer handle. May be used from many threads.
34 pub(crate) struct Steal<T: 'static>(Arc<Inner<T>>);
35
36 #[repr(align(128))]
37 pub(crate) struct Inner<T: 'static> {
38 /// Concurrently updated by many threads.
39 ///
40 /// Contains two `UnsignedShort` values. The `LSB` byte is the "real" head of
41 /// the queue. The `UnsignedShort` in the `MSB` is set by a stealer in process
42 /// of stealing values. It represents the first value being stolen in the
43 /// batch. The `UnsignedShort` indices are intentionally wider than strictly
44 /// required for buffer indexing in order to provide ABA mitigation and make
45 /// it possible to distinguish between full and empty buffers.
46 ///
47 /// When both `UnsignedShort` values are the same, there is no active
48 /// stealer.
49 ///
50 /// Tracking an in-progress stealer prevents a wrapping scenario.
51 head: AtomicUnsignedLong,
52
53 /// Only updated by producer thread but read by many threads.
54 tail: AtomicUnsignedShort,
55
56 /// Elements
57 buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>]>,
58
59 mask: usize,
60 }
61
62 unsafe impl<T> Send for Inner<T> {}
63 unsafe impl<T> Sync for Inner<T> {}
64
65 /// Create a new local run-queue
local<T: 'static>(capacity: usize) -> (Steal<T>, Local<T>)66 pub(crate) fn local<T: 'static>(capacity: usize) -> (Steal<T>, Local<T>) {
67 assert!(capacity <= 4096);
68 assert!(capacity >= 1);
69
70 let mut buffer = Vec::with_capacity(capacity);
71
72 for _ in 0..capacity {
73 buffer.push(UnsafeCell::new(MaybeUninit::uninit()));
74 }
75
76 let inner = Arc::new(Inner {
77 head: AtomicUnsignedLong::new(0),
78 tail: AtomicUnsignedShort::new(0),
79 buffer: buffer.into_boxed_slice(),
80 mask: capacity - 1,
81 });
82
83 let local = Local {
84 inner: inner.clone(),
85 };
86
87 let remote = Steal(inner);
88
89 (remote, local)
90 }
91
92 impl<T> Local<T> {
93 /// How many tasks can be pushed into the queue
remaining_slots(&self) -> usize94 pub(crate) fn remaining_slots(&self) -> usize {
95 self.inner.remaining_slots()
96 }
97
max_capacity(&self) -> usize98 pub(crate) fn max_capacity(&self) -> usize {
99 self.inner.buffer.len()
100 }
101
102 /// Returns `true` if there are no entries in the queue
is_empty(&self) -> bool103 pub(crate) fn is_empty(&self) -> bool {
104 self.inner.is_empty()
105 }
106
can_steal(&self) -> bool107 pub(crate) fn can_steal(&self) -> bool {
108 self.remaining_slots() >= self.max_capacity() - self.max_capacity() / 2
109 }
110
111 /// Pushes a batch of tasks to the back of the queue. All tasks must fit in
112 /// the local queue.
113 ///
114 /// # Panics
115 ///
116 /// The method panics if there is not enough capacity to fit in the queue.
push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>)117 pub(crate) fn push_back(&mut self, tasks: impl ExactSizeIterator<Item = task::Notified<T>>) {
118 let len = tasks.len();
119 assert!(len <= self.inner.buffer.len());
120
121 if len == 0 {
122 // Nothing to do
123 return;
124 }
125
126 let head = self.inner.head.load(Acquire);
127 let (steal, real) = unpack(head);
128
129 // safety: this is the **only** thread that updates this cell.
130 let mut tail = unsafe { self.inner.tail.unsync_load() };
131
132 if tail.wrapping_sub(steal) <= (self.inner.buffer.len() - len) as UnsignedShort {
133 // Yes, this if condition is structured a bit weird (first block
134 // does nothing, second returns an error). It is this way to match
135 // `push_back_or_overflow`.
136 } else {
137 panic!(
138 "not enough capacity; len={}; tail={}; steal={}; real={}",
139 len, tail, steal, real
140 );
141 }
142
143 for task in tasks {
144 let idx = tail as usize & self.inner.mask;
145
146 self.inner.buffer[idx].with_mut(|ptr| {
147 // Write the task to the slot
148 //
149 // Safety: There is only one producer and the above `if`
150 // condition ensures we don't touch a cell if there is a
151 // value, thus no consumer.
152 unsafe {
153 ptr::write((*ptr).as_mut_ptr(), task);
154 }
155 });
156
157 tail = tail.wrapping_add(1);
158 }
159
160 self.inner.tail.store(tail, Release);
161 }
162
163 /// Pushes a task to the back of the local queue, if there is not enough
164 /// capacity in the queue, this triggers the overflow operation.
165 ///
166 /// When the queue overflows, half of the current contents of the queue is
167 /// moved to the given Injection queue. This frees up capacity for more
168 /// tasks to be pushed into the local queue.
push_back_or_overflow<O: Overflow<T>>( &mut self, mut task: task::Notified<T>, overflow: &O, stats: &mut Stats, )169 pub(crate) fn push_back_or_overflow<O: Overflow<T>>(
170 &mut self,
171 mut task: task::Notified<T>,
172 overflow: &O,
173 stats: &mut Stats,
174 ) {
175 let tail = loop {
176 let head = self.inner.head.load(Acquire);
177 let (steal, real) = unpack(head);
178
179 // safety: this is the **only** thread that updates this cell.
180 let tail = unsafe { self.inner.tail.unsync_load() };
181
182 if tail.wrapping_sub(steal) < self.inner.buffer.len() as UnsignedShort {
183 // There is capacity for the task
184 break tail;
185 } else if steal != real {
186 super::counters::inc_num_overflows();
187 // Concurrently stealing, this will free up capacity, so only
188 // push the task onto the inject queue
189 overflow.push(task);
190 return;
191 } else {
192 super::counters::inc_num_overflows();
193 // Push the current task and half of the queue into the
194 // inject queue.
195 match self.push_overflow(task, real, tail, overflow, stats) {
196 Ok(_) => return,
197 // Lost the race, try again
198 Err(v) => {
199 task = v;
200 }
201 }
202 }
203 };
204
205 self.push_back_finish(task, tail);
206 }
207
208 // Second half of `push_back`
push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort)209 fn push_back_finish(&self, task: task::Notified<T>, tail: UnsignedShort) {
210 // Map the position to a slot index.
211 let idx = tail as usize & self.inner.mask;
212
213 self.inner.buffer[idx].with_mut(|ptr| {
214 // Write the task to the slot
215 //
216 // Safety: There is only one producer and the above `if`
217 // condition ensures we don't touch a cell if there is a
218 // value, thus no consumer.
219 unsafe {
220 ptr::write((*ptr).as_mut_ptr(), task);
221 }
222 });
223
224 // Make the task available. Synchronizes with a load in
225 // `steal_into2`.
226 self.inner.tail.store(tail.wrapping_add(1), Release);
227 }
228
229 /// Moves a batch of tasks into the inject queue.
230 ///
231 /// This will temporarily make some of the tasks unavailable to stealers.
232 /// Once `push_overflow` is done, a notification is sent out, so if other
233 /// workers "missed" some of the tasks during a steal, they will get
234 /// another opportunity.
235 #[inline(never)]
push_overflow<O: Overflow<T>>( &mut self, task: task::Notified<T>, head: UnsignedShort, tail: UnsignedShort, overflow: &O, stats: &mut Stats, ) -> Result<(), task::Notified<T>>236 fn push_overflow<O: Overflow<T>>(
237 &mut self,
238 task: task::Notified<T>,
239 head: UnsignedShort,
240 tail: UnsignedShort,
241 overflow: &O,
242 stats: &mut Stats,
243 ) -> Result<(), task::Notified<T>> {
244 // How many elements are we taking from the local queue.
245 //
246 // This is one less than the number of tasks pushed to the inject
247 // queue as we are also inserting the `task` argument.
248 let num_tasks_taken: UnsignedShort = (self.inner.buffer.len() / 2) as UnsignedShort;
249
250 assert_eq!(
251 tail.wrapping_sub(head) as usize,
252 self.inner.buffer.len(),
253 "queue is not full; tail = {}; head = {}",
254 tail,
255 head
256 );
257
258 let prev = pack(head, head);
259
260 // Claim a bunch of tasks
261 //
262 // We are claiming the tasks **before** reading them out of the buffer.
263 // This is safe because only the **current** thread is able to push new
264 // tasks.
265 //
266 // There isn't really any need for memory ordering... Relaxed would
267 // work. This is because all tasks are pushed into the queue from the
268 // current thread (or memory has been acquired if the local queue handle
269 // moved).
270 if self
271 .inner
272 .head
273 .compare_exchange(
274 prev,
275 pack(
276 head.wrapping_add(num_tasks_taken),
277 head.wrapping_add(num_tasks_taken),
278 ),
279 Release,
280 Relaxed,
281 )
282 .is_err()
283 {
284 // We failed to claim the tasks, losing the race. Return out of
285 // this function and try the full `push` routine again. The queue
286 // may not be full anymore.
287 return Err(task);
288 }
289
290 /// An iterator that takes elements out of the run queue.
291 struct BatchTaskIter<'a, T: 'static> {
292 buffer: &'a [UnsafeCell<MaybeUninit<task::Notified<T>>>],
293 mask: usize,
294 head: UnsignedLong,
295 i: UnsignedLong,
296 num: UnsignedShort,
297 }
298 impl<'a, T: 'static> Iterator for BatchTaskIter<'a, T> {
299 type Item = task::Notified<T>;
300
301 #[inline]
302 fn next(&mut self) -> Option<task::Notified<T>> {
303 if self.i == UnsignedLong::from(self.num) {
304 None
305 } else {
306 let i_idx = self.i.wrapping_add(self.head) as usize & self.mask;
307 let slot = &self.buffer[i_idx];
308
309 // safety: Our CAS from before has assumed exclusive ownership
310 // of the task pointers in this range.
311 let task = slot.with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
312
313 self.i += 1;
314 Some(task)
315 }
316 }
317 }
318
319 // safety: The CAS above ensures that no consumer will look at these
320 // values again, and we are the only producer.
321 let batch_iter = BatchTaskIter {
322 buffer: &self.inner.buffer,
323 mask: self.inner.mask,
324 head: head as UnsignedLong,
325 i: 0,
326 num: num_tasks_taken,
327 };
328 overflow.push_batch(batch_iter.chain(std::iter::once(task)));
329
330 // Add 1 to factor in the task currently being scheduled.
331 stats.incr_overflow_count();
332
333 Ok(())
334 }
335
336 /// Pops a task from the local queue.
pop(&mut self) -> Option<task::Notified<T>>337 pub(crate) fn pop(&mut self) -> Option<task::Notified<T>> {
338 let mut head = self.inner.head.load(Acquire);
339
340 let idx = loop {
341 let (steal, real) = unpack(head);
342
343 // safety: this is the **only** thread that updates this cell.
344 let tail = unsafe { self.inner.tail.unsync_load() };
345
346 if real == tail {
347 // queue is empty
348 return None;
349 }
350
351 let next_real = real.wrapping_add(1);
352
353 // If `steal == real` there are no concurrent stealers. Both `steal`
354 // and `real` are updated.
355 let next = if steal == real {
356 pack(next_real, next_real)
357 } else {
358 assert_ne!(steal, next_real);
359 pack(steal, next_real)
360 };
361
362 // Attempt to claim a task.
363 let res = self
364 .inner
365 .head
366 .compare_exchange(head, next, AcqRel, Acquire);
367
368 match res {
369 Ok(_) => break real as usize & self.inner.mask,
370 Err(actual) => head = actual,
371 }
372 };
373
374 Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
375 }
376 }
377
378 impl<T> Steal<T> {
is_empty(&self) -> bool379 pub(crate) fn is_empty(&self) -> bool {
380 self.0.is_empty()
381 }
382
383 /// Steals half the tasks from self and place them into `dst`.
steal_into( &self, dst: &mut Local<T>, dst_stats: &mut Stats, ) -> Option<task::Notified<T>>384 pub(crate) fn steal_into(
385 &self,
386 dst: &mut Local<T>,
387 dst_stats: &mut Stats,
388 ) -> Option<task::Notified<T>> {
389 // Safety: the caller is the only thread that mutates `dst.tail` and
390 // holds a mutable reference.
391 let dst_tail = unsafe { dst.inner.tail.unsync_load() };
392
393 // To the caller, `dst` may **look** empty but still have values
394 // contained in the buffer. If another thread is concurrently stealing
395 // from `dst` there may not be enough capacity to steal.
396 let (steal, _) = unpack(dst.inner.head.load(Acquire));
397
398 if dst_tail.wrapping_sub(steal) > self.0.buffer.len() as UnsignedShort / 2 {
399 // we *could* try to steal less here, but for simplicity, we're just
400 // going to abort.
401 return None;
402 }
403
404 // Steal the tasks into `dst`'s buffer. This does not yet expose the
405 // tasks in `dst`.
406 let mut n = self.steal_into2(dst, dst_tail);
407
408 if n == 0 {
409 // No tasks were stolen
410 return None;
411 }
412
413 super::counters::inc_num_steals();
414
415 dst_stats.incr_steal_count(n as u16);
416 dst_stats.incr_steal_operations();
417
418 // We are returning a task here
419 n -= 1;
420
421 let ret_pos = dst_tail.wrapping_add(n);
422 let ret_idx = ret_pos as usize & dst.inner.mask;
423
424 // safety: the value was written as part of `steal_into2` and not
425 // exposed to stealers, so no other thread can access it.
426 let ret = dst.inner.buffer[ret_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
427
428 if n == 0 {
429 // The `dst` queue is empty, but a single task was stolen
430 return Some(ret);
431 }
432
433 // Make the stolen items available to consumers
434 dst.inner.tail.store(dst_tail.wrapping_add(n), Release);
435
436 Some(ret)
437 }
438
439 // Steal tasks from `self`, placing them into `dst`. Returns the number of
440 // tasks that were stolen.
steal_into2(&self, dst: &mut Local<T>, dst_tail: UnsignedShort) -> UnsignedShort441 fn steal_into2(&self, dst: &mut Local<T>, dst_tail: UnsignedShort) -> UnsignedShort {
442 let mut prev_packed = self.0.head.load(Acquire);
443 let mut next_packed;
444
445 let n = loop {
446 let (src_head_steal, src_head_real) = unpack(prev_packed);
447 let src_tail = self.0.tail.load(Acquire);
448
449 // If these two do not match, another thread is concurrently
450 // stealing from the queue.
451 if src_head_steal != src_head_real {
452 return 0;
453 }
454
455 // Number of available tasks to steal
456 let n = src_tail.wrapping_sub(src_head_real);
457 let n = n - n / 2;
458
459 if n == 0 {
460 // No tasks available to steal
461 return 0;
462 }
463
464 // Update the real head index to acquire the tasks.
465 let steal_to = src_head_real.wrapping_add(n);
466 assert_ne!(src_head_steal, steal_to);
467 next_packed = pack(src_head_steal, steal_to);
468
469 // Claim all those tasks. This is done by incrementing the "real"
470 // head but not the steal. By doing this, no other thread is able to
471 // steal from this queue until the current thread completes.
472 let res = self
473 .0
474 .head
475 .compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
476
477 match res {
478 Ok(_) => break n,
479 Err(actual) => prev_packed = actual,
480 }
481 };
482
483 debug_assert!(
484 n <= (self.0.buffer.len() - self.0.buffer.len() / 2) as UnsignedShort,
485 "actual = {}",
486 n
487 );
488
489 let (first, _) = unpack(next_packed);
490
491 // Take all the tasks
492 for i in 0..n {
493 // Compute the positions
494 let src_pos = first.wrapping_add(i);
495 let dst_pos = dst_tail.wrapping_add(i);
496
497 // Map to slots
498 let src_idx = src_pos as usize & self.0.mask;
499 let dst_idx = dst_pos as usize & self.0.mask;
500
501 // Read the task
502 //
503 // safety: We acquired the task with the atomic exchange above.
504 let task = self.0.buffer[src_idx].with(|ptr| unsafe { ptr::read((*ptr).as_ptr()) });
505
506 // Write the task to the new slot
507 //
508 // safety: `dst` queue is empty and we are the only producer to
509 // this queue.
510 dst.inner.buffer[dst_idx]
511 .with_mut(|ptr| unsafe { ptr::write((*ptr).as_mut_ptr(), task) });
512 }
513
514 let mut prev_packed = next_packed;
515
516 // Update `src_head_steal` to match `src_head_real` signalling that the
517 // stealing routine is complete.
518 loop {
519 let head = unpack(prev_packed).1;
520 next_packed = pack(head, head);
521
522 let res = self
523 .0
524 .head
525 .compare_exchange(prev_packed, next_packed, AcqRel, Acquire);
526
527 match res {
528 Ok(_) => return n,
529 Err(actual) => {
530 let (actual_steal, actual_real) = unpack(actual);
531
532 assert_ne!(actual_steal, actual_real);
533
534 prev_packed = actual;
535 }
536 }
537 }
538 }
539 }
540
541 cfg_unstable_metrics! {
542 impl<T> Steal<T> {
543 pub(crate) fn len(&self) -> usize {
544 self.0.len() as _
545 }
546 }
547 }
548
549 impl<T> Clone for Steal<T> {
clone(&self) -> Steal<T>550 fn clone(&self) -> Steal<T> {
551 Steal(self.0.clone())
552 }
553 }
554
555 impl<T> Drop for Local<T> {
drop(&mut self)556 fn drop(&mut self) {
557 if !std::thread::panicking() {
558 assert!(self.pop().is_none(), "queue not empty");
559 }
560 }
561 }
562
563 impl<T> Inner<T> {
remaining_slots(&self) -> usize564 fn remaining_slots(&self) -> usize {
565 let (steal, _) = unpack(self.head.load(Acquire));
566 let tail = self.tail.load(Acquire);
567
568 self.buffer.len() - (tail.wrapping_sub(steal) as usize)
569 }
570
len(&self) -> UnsignedShort571 fn len(&self) -> UnsignedShort {
572 let (_, head) = unpack(self.head.load(Acquire));
573 let tail = self.tail.load(Acquire);
574
575 tail.wrapping_sub(head)
576 }
577
is_empty(&self) -> bool578 fn is_empty(&self) -> bool {
579 self.len() == 0
580 }
581 }
582
583 /// Split the head value into the real head and the index a stealer is working
584 /// on.
unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort)585 fn unpack(n: UnsignedLong) -> (UnsignedShort, UnsignedShort) {
586 let real = n & UnsignedShort::MAX as UnsignedLong;
587 let steal = n >> (mem::size_of::<UnsignedShort>() * 8);
588
589 (steal as UnsignedShort, real as UnsignedShort)
590 }
591
592 /// Join the two head values
pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong593 fn pack(steal: UnsignedShort, real: UnsignedShort) -> UnsignedLong {
594 (real as UnsignedLong) | ((steal as UnsignedLong) << (mem::size_of::<UnsignedShort>() * 8))
595 }
596