• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::loom::sync::atomic::AtomicPtr;
2 use crate::runtime::task::{Header, Task};
3 
4 use std::marker::PhantomData;
5 use std::ptr::{self, NonNull};
6 use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
7 
8 /// Concurrent stack of tasks, used to pass ownership of a task from one worker
9 /// to another.
10 pub(crate) struct TransferStack<T: 'static> {
11     head: AtomicPtr<Header>,
12     _p: PhantomData<T>,
13 }
14 
15 impl<T: 'static> TransferStack<T> {
new() -> TransferStack<T>16     pub(crate) fn new() -> TransferStack<T> {
17         TransferStack {
18             head: AtomicPtr::new(ptr::null_mut()),
19             _p: PhantomData,
20         }
21     }
22 
push(&self, task: Task<T>)23     pub(crate) fn push(&self, task: Task<T>) {
24         let task = task.into_raw();
25 
26         // We don't care about any memory associated w/ setting the `head`
27         // field, just the current value.
28         //
29         // The compare-exchange creates a release sequence.
30         let mut curr = self.head.load(Relaxed);
31 
32         loop {
33             unsafe {
34                 task.as_ref()
35                     .stack_next
36                     .with_mut(|ptr| *ptr = NonNull::new(curr))
37             };
38 
39             let res = self
40                 .head
41                 .compare_exchange(curr, task.as_ptr() as *mut _, Release, Relaxed);
42 
43             match res {
44                 Ok(_) => return,
45                 Err(actual) => {
46                     curr = actual;
47                 }
48             }
49         }
50     }
51 
drain(&self) -> impl Iterator<Item = Task<T>>52     pub(crate) fn drain(&self) -> impl Iterator<Item = Task<T>> {
53         struct Iter<T: 'static>(Option<NonNull<Header>>, PhantomData<T>);
54 
55         impl<T: 'static> Iterator for Iter<T> {
56             type Item = Task<T>;
57 
58             fn next(&mut self) -> Option<Task<T>> {
59                 let task = self.0?;
60 
61                 // Move the cursor forward
62                 self.0 = unsafe { task.as_ref().stack_next.with(|ptr| *ptr) };
63 
64                 // Return the task
65                 unsafe { Some(Task::from_raw(task)) }
66             }
67         }
68 
69         impl<T: 'static> Drop for Iter<T> {
70             fn drop(&mut self) {
71                 use std::process;
72 
73                 if self.0.is_some() {
74                     // we have bugs
75                     process::abort();
76                 }
77             }
78         }
79 
80         let ptr = self.head.swap(ptr::null_mut(), Acquire);
81         Iter(NonNull::new(ptr), PhantomData)
82     }
83 }
84