• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Core task module.
2 //!
3 //! # Safety
4 //!
5 //! The functions in this module are private to the `task` module. All of them
6 //! should be considered `unsafe` to use, but are not marked as such since it
7 //! would be too noisy.
8 //!
9 //! Make sure to consult the relevant safety section of each function before
10 //! use.
11 
12 use crate::loom::cell::UnsafeCell;
13 use crate::runtime::task::raw::{self, Vtable};
14 use crate::runtime::task::state::State;
15 use crate::runtime::task::{Notified, Schedule, Task};
16 use crate::util::linked_list;
17 
18 use std::future::Future;
19 use std::pin::Pin;
20 use std::ptr::NonNull;
21 use std::task::{Context, Poll, Waker};
22 
23 /// The task cell. Contains the components of the task.
24 ///
25 /// It is critical for `Header` to be the first field as the task structure will
26 /// be referenced by both *mut Cell and *mut Header.
27 #[repr(C)]
28 pub(super) struct Cell<T: Future, S> {
29     /// Hot task state data
30     pub(super) header: Header,
31 
32     /// Either the future or output, depending on the execution stage.
33     pub(super) core: Core<T, S>,
34 
35     /// Cold data
36     pub(super) trailer: Trailer,
37 }
38 
39 pub(super) struct Scheduler<S> {
40     scheduler: UnsafeCell<Option<S>>,
41 }
42 
43 pub(super) struct CoreStage<T: Future> {
44     stage: UnsafeCell<Stage<T>>,
45 }
46 
47 /// The core of the task.
48 ///
49 /// Holds the future or output, depending on the stage of execution.
50 pub(super) struct Core<T: Future, S> {
51     /// Scheduler used to drive this future
52     pub(super) scheduler: Scheduler<S>,
53 
54     /// Either the future or the output
55     pub(super) stage: CoreStage<T>,
56 }
57 
58 /// Crate public as this is also needed by the pool.
59 #[repr(C)]
60 pub(crate) struct Header {
61     /// Task state
62     pub(super) state: State,
63 
64     pub(crate) owned: UnsafeCell<linked_list::Pointers<Header>>,
65 
66     /// Pointer to next task, used with the injection queue
67     pub(crate) queue_next: UnsafeCell<Option<NonNull<Header>>>,
68 
69     /// Pointer to the next task in the transfer stack
70     pub(super) stack_next: UnsafeCell<Option<NonNull<Header>>>,
71 
72     /// Table of function pointers for executing actions on the task.
73     pub(super) vtable: &'static Vtable,
74 }
75 
76 unsafe impl Send for Header {}
77 unsafe impl Sync for Header {}
78 
79 /// Cold data is stored after the future.
80 pub(super) struct Trailer {
81     /// Consumer task waiting on completion of this task.
82     pub(super) waker: UnsafeCell<Option<Waker>>,
83 }
84 
85 /// Either the future or the output.
86 pub(super) enum Stage<T: Future> {
87     Running(T),
88     Finished(super::Result<T::Output>),
89     Consumed,
90 }
91 
92 impl<T: Future, S: Schedule> Cell<T, S> {
93     /// Allocates a new task cell, containing the header, trailer, and core
94     /// structures.
new(future: T, state: State) -> Box<Cell<T, S>>95     pub(super) fn new(future: T, state: State) -> Box<Cell<T, S>> {
96         Box::new(Cell {
97             header: Header {
98                 state,
99                 owned: UnsafeCell::new(linked_list::Pointers::new()),
100                 queue_next: UnsafeCell::new(None),
101                 stack_next: UnsafeCell::new(None),
102                 vtable: raw::vtable::<T, S>(),
103             },
104             core: Core {
105                 scheduler: Scheduler {
106                     scheduler: UnsafeCell::new(None),
107                 },
108                 stage: CoreStage {
109                     stage: UnsafeCell::new(Stage::Running(future)),
110                 },
111             },
112             trailer: Trailer {
113                 waker: UnsafeCell::new(None),
114             },
115         })
116     }
117 }
118 
119 impl<S: Schedule> Scheduler<S> {
with_mut<R>(&self, f: impl FnOnce(*mut Option<S>) -> R) -> R120     pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Option<S>) -> R) -> R {
121         self.scheduler.with_mut(f)
122     }
123 
124     /// Bind a scheduler to the task.
125     ///
126     /// This only happens on the first poll and must be preceeded by a call to
127     /// `is_bound` to determine if binding is appropriate or not.
128     ///
129     /// # Safety
130     ///
131     /// Binding must not be done concurrently since it will mutate the task
132     /// core through a shared reference.
bind_scheduler(&self, task: Task<S>)133     pub(super) fn bind_scheduler(&self, task: Task<S>) {
134         // This function may be called concurrently, but the __first__ time it
135         // is called, the caller has unique access to this field. All subsequent
136         // concurrent calls will be via the `Waker`, which will "happens after"
137         // the first poll.
138         //
139         // In other words, it is always safe to read the field and it is safe to
140         // write to the field when it is `None`.
141         debug_assert!(!self.is_bound());
142 
143         // Bind the task to the scheduler
144         let scheduler = S::bind(task);
145 
146         // Safety: As `scheduler` is not set, this is the first poll
147         self.scheduler.with_mut(|ptr| unsafe {
148             *ptr = Some(scheduler);
149         });
150     }
151 
152     /// Returns true if the task is bound to a scheduler.
is_bound(&self) -> bool153     pub(super) fn is_bound(&self) -> bool {
154         // Safety: never called concurrently w/ a mutation.
155         self.scheduler.with(|ptr| unsafe { (*ptr).is_some() })
156     }
157 
158     /// Schedule the future for execution
schedule(&self, task: Notified<S>)159     pub(super) fn schedule(&self, task: Notified<S>) {
160         self.scheduler.with(|ptr| {
161             // Safety: Can only be called after initial `poll`, which is the
162             // only time the field is mutated.
163             match unsafe { &*ptr } {
164                 Some(scheduler) => scheduler.schedule(task),
165                 None => panic!("no scheduler set"),
166             }
167         });
168     }
169 
170     /// Schedule the future for execution in the near future, yielding the
171     /// thread to other tasks.
yield_now(&self, task: Notified<S>)172     pub(super) fn yield_now(&self, task: Notified<S>) {
173         self.scheduler.with(|ptr| {
174             // Safety: Can only be called after initial `poll`, which is the
175             // only time the field is mutated.
176             match unsafe { &*ptr } {
177                 Some(scheduler) => scheduler.yield_now(task),
178                 None => panic!("no scheduler set"),
179             }
180         });
181     }
182 
183     /// Release the task
184     ///
185     /// If the `Scheduler` implementation is able to, it returns the `Task`
186     /// handle immediately. The caller of this function will batch a ref-dec
187     /// with a state change.
release(&self, task: Task<S>) -> Option<Task<S>>188     pub(super) fn release(&self, task: Task<S>) -> Option<Task<S>> {
189         use std::mem::ManuallyDrop;
190 
191         let task = ManuallyDrop::new(task);
192 
193         self.scheduler.with(|ptr| {
194             // Safety: Can only be called after initial `poll`, which is the
195             // only time the field is mutated.
196             match unsafe { &*ptr } {
197                 Some(scheduler) => scheduler.release(&*task),
198                 // Task was never polled
199                 None => None,
200             }
201         })
202     }
203 }
204 
205 impl<T: Future> CoreStage<T> {
with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R206     pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R {
207         self.stage.with_mut(f)
208     }
209 
210     /// Poll the future
211     ///
212     /// # Safety
213     ///
214     /// The caller must ensure it is safe to mutate the `state` field. This
215     /// requires ensuring mutal exclusion between any concurrent thread that
216     /// might modify the future or output field.
217     ///
218     /// The mutual exclusion is implemented by `Harness` and the `Lifecycle`
219     /// component of the task state.
220     ///
221     /// `self` must also be pinned. This is handled by storing the task on the
222     /// heap.
poll(&self, mut cx: Context<'_>) -> Poll<T::Output>223     pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
224         let res = {
225             self.stage.with_mut(|ptr| {
226                 // Safety: The caller ensures mutual exclusion to the field.
227                 let future = match unsafe { &mut *ptr } {
228                     Stage::Running(future) => future,
229                     _ => unreachable!("unexpected stage"),
230                 };
231 
232                 // Safety: The caller ensures the future is pinned.
233                 let future = unsafe { Pin::new_unchecked(future) };
234 
235                 future.poll(&mut cx)
236             })
237         };
238 
239         if res.is_ready() {
240             self.drop_future_or_output();
241         }
242 
243         res
244     }
245 
246     /// Drop the future
247     ///
248     /// # Safety
249     ///
250     /// The caller must ensure it is safe to mutate the `stage` field.
drop_future_or_output(&self)251     pub(super) fn drop_future_or_output(&self) {
252         // Safety: the caller ensures mutual exclusion to the field.
253         unsafe {
254             self.set_stage(Stage::Consumed);
255         }
256     }
257 
258     /// Store the task output
259     ///
260     /// # Safety
261     ///
262     /// The caller must ensure it is safe to mutate the `stage` field.
store_output(&self, output: super::Result<T::Output>)263     pub(super) fn store_output(&self, output: super::Result<T::Output>) {
264         // Safety: the caller ensures mutual exclusion to the field.
265         unsafe {
266             self.set_stage(Stage::Finished(output));
267         }
268     }
269 
270     /// Take the task output
271     ///
272     /// # Safety
273     ///
274     /// The caller must ensure it is safe to mutate the `stage` field.
take_output(&self) -> super::Result<T::Output>275     pub(super) fn take_output(&self) -> super::Result<T::Output> {
276         use std::mem;
277 
278         self.stage.with_mut(|ptr| {
279             // Safety:: the caller ensures mutal exclusion to the field.
280             match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
281                 Stage::Finished(output) => output,
282                 _ => panic!("unexpected task state"),
283             }
284         })
285     }
286 
set_stage(&self, stage: Stage<T>)287     unsafe fn set_stage(&self, stage: Stage<T>) {
288         self.stage.with_mut(|ptr| *ptr = stage)
289     }
290 }
291 
292 cfg_rt_multi_thread! {
293     impl Header {
294         pub(crate) fn shutdown(&self) {
295             use crate::runtime::task::RawTask;
296 
297             let task = unsafe { RawTask::from_raw(self.into()) };
298             task.shutdown();
299         }
300 
301         pub(crate) unsafe fn set_next(&self, next: Option<NonNull<Header>>) {
302             self.queue_next.with_mut(|ptr| *ptr = next);
303         }
304     }
305 }
306 
307 impl Trailer {
set_waker(&self, waker: Option<Waker>)308     pub(crate) unsafe fn set_waker(&self, waker: Option<Waker>) {
309         self.waker.with_mut(|ptr| {
310             *ptr = waker;
311         });
312     }
313 
will_wake(&self, waker: &Waker) -> bool314     pub(crate) unsafe fn will_wake(&self, waker: &Waker) -> bool {
315         self.waker
316             .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker))
317     }
318 
wake_join(&self)319     pub(crate) fn wake_join(&self) {
320         self.waker.with(|ptr| match unsafe { &*ptr } {
321             Some(waker) => waker.wake_by_ref(),
322             None => panic!("waker missing"),
323         });
324     }
325 }
326 
327 #[test]
328 #[cfg(not(loom))]
header_lte_cache_line()329 fn header_lte_cache_line() {
330     use std::mem::size_of;
331 
332     assert!(size_of::<Header>() <= 8 * size_of::<*const ()>());
333 }
334