• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //! Core task module.
2 //!
3 //! # Safety
4 //!
5 //! The functions in this module are private to the `task` module. All of them
6 //! should be considered `unsafe` to use, but are not marked as such since it
7 //! would be too noisy.
8 //!
9 //! Make sure to consult the relevant safety section of each function before
10 //! use.
11 
12 use crate::future::Future;
13 use crate::loom::cell::UnsafeCell;
14 use crate::runtime::task::raw::{self, Vtable};
15 use crate::runtime::task::state::State;
16 use crate::runtime::task::Schedule;
17 use crate::util::linked_list;
18 
19 use std::pin::Pin;
20 use std::ptr::NonNull;
21 use std::task::{Context, Poll, Waker};
22 
23 /// The task cell. Contains the components of the task.
24 ///
25 /// It is critical for `Header` to be the first field as the task structure will
26 /// be referenced by both *mut Cell and *mut Header.
27 #[repr(C)]
28 pub(super) struct Cell<T: Future, S> {
29     /// Hot task state data
30     pub(super) header: Header,
31 
32     /// Either the future or output, depending on the execution stage.
33     pub(super) core: Core<T, S>,
34 
35     /// Cold data
36     pub(super) trailer: Trailer,
37 }
38 
39 pub(super) struct CoreStage<T: Future> {
40     stage: UnsafeCell<Stage<T>>,
41 }
42 
43 /// The core of the task.
44 ///
45 /// Holds the future or output, depending on the stage of execution.
46 pub(super) struct Core<T: Future, S> {
47     /// Scheduler used to drive this future.
48     pub(super) scheduler: S,
49 
50     /// Either the future or the output.
51     pub(super) stage: CoreStage<T>,
52 }
53 
54 /// Crate public as this is also needed by the pool.
55 #[repr(C)]
56 pub(crate) struct Header {
57     /// Task state.
58     pub(super) state: State,
59 
60     pub(super) owned: UnsafeCell<linked_list::Pointers<Header>>,
61 
62     /// Pointer to next task, used with the injection queue.
63     pub(super) queue_next: UnsafeCell<Option<NonNull<Header>>>,
64 
65     /// Table of function pointers for executing actions on the task.
66     pub(super) vtable: &'static Vtable,
67 
68     /// This integer contains the id of the OwnedTasks or LocalOwnedTasks that
69     /// this task is stored in. If the task is not in any list, should be the
70     /// id of the list that it was previously in, or zero if it has never been
71     /// in any list.
72     ///
73     /// Once a task has been bound to a list, it can never be bound to another
74     /// list, even if removed from the first list.
75     ///
76     /// The id is not unset when removed from a list because we want to be able
77     /// to read the id without synchronization, even if it is concurrently being
78     /// removed from the list.
79     pub(super) owner_id: UnsafeCell<u64>,
80 
81     /// The tracing ID for this instrumented task.
82     #[cfg(all(tokio_unstable, feature = "tracing"))]
83     pub(super) id: Option<tracing::Id>,
84 }
85 
86 unsafe impl Send for Header {}
87 unsafe impl Sync for Header {}
88 
89 /// Cold data is stored after the future.
90 pub(super) struct Trailer {
91     /// Consumer task waiting on completion of this task.
92     pub(super) waker: UnsafeCell<Option<Waker>>,
93 }
94 
95 /// Either the future or the output.
96 pub(super) enum Stage<T: Future> {
97     Running(T),
98     Finished(super::Result<T::Output>),
99     Consumed,
100 }
101 
102 impl<T: Future, S: Schedule> Cell<T, S> {
103     /// Allocates a new task cell, containing the header, trailer, and core
104     /// structures.
new(future: T, scheduler: S, state: State) -> Box<Cell<T, S>>105     pub(super) fn new(future: T, scheduler: S, state: State) -> Box<Cell<T, S>> {
106         #[cfg(all(tokio_unstable, feature = "tracing"))]
107         let id = future.id();
108         Box::new(Cell {
109             header: Header {
110                 state,
111                 owned: UnsafeCell::new(linked_list::Pointers::new()),
112                 queue_next: UnsafeCell::new(None),
113                 vtable: raw::vtable::<T, S>(),
114                 owner_id: UnsafeCell::new(0),
115                 #[cfg(all(tokio_unstable, feature = "tracing"))]
116                 id,
117             },
118             core: Core {
119                 scheduler,
120                 stage: CoreStage {
121                     stage: UnsafeCell::new(Stage::Running(future)),
122                 },
123             },
124             trailer: Trailer {
125                 waker: UnsafeCell::new(None),
126             },
127         })
128     }
129 }
130 
131 impl<T: Future> CoreStage<T> {
with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R132     pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R {
133         self.stage.with_mut(f)
134     }
135 
136     /// Polls the future.
137     ///
138     /// # Safety
139     ///
140     /// The caller must ensure it is safe to mutate the `state` field. This
141     /// requires ensuring mutual exclusion between any concurrent thread that
142     /// might modify the future or output field.
143     ///
144     /// The mutual exclusion is implemented by `Harness` and the `Lifecycle`
145     /// component of the task state.
146     ///
147     /// `self` must also be pinned. This is handled by storing the task on the
148     /// heap.
poll(&self, mut cx: Context<'_>) -> Poll<T::Output>149     pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
150         let res = {
151             self.stage.with_mut(|ptr| {
152                 // Safety: The caller ensures mutual exclusion to the field.
153                 let future = match unsafe { &mut *ptr } {
154                     Stage::Running(future) => future,
155                     _ => unreachable!("unexpected stage"),
156                 };
157 
158                 // Safety: The caller ensures the future is pinned.
159                 let future = unsafe { Pin::new_unchecked(future) };
160 
161                 future.poll(&mut cx)
162             })
163         };
164 
165         if res.is_ready() {
166             self.drop_future_or_output();
167         }
168 
169         res
170     }
171 
172     /// Drops the future.
173     ///
174     /// # Safety
175     ///
176     /// The caller must ensure it is safe to mutate the `stage` field.
drop_future_or_output(&self)177     pub(super) fn drop_future_or_output(&self) {
178         // Safety: the caller ensures mutual exclusion to the field.
179         unsafe {
180             self.set_stage(Stage::Consumed);
181         }
182     }
183 
184     /// Stores the task output.
185     ///
186     /// # Safety
187     ///
188     /// The caller must ensure it is safe to mutate the `stage` field.
store_output(&self, output: super::Result<T::Output>)189     pub(super) fn store_output(&self, output: super::Result<T::Output>) {
190         // Safety: the caller ensures mutual exclusion to the field.
191         unsafe {
192             self.set_stage(Stage::Finished(output));
193         }
194     }
195 
196     /// Takes the task output.
197     ///
198     /// # Safety
199     ///
200     /// The caller must ensure it is safe to mutate the `stage` field.
take_output(&self) -> super::Result<T::Output>201     pub(super) fn take_output(&self) -> super::Result<T::Output> {
202         use std::mem;
203 
204         self.stage.with_mut(|ptr| {
205             // Safety:: the caller ensures mutual exclusion to the field.
206             match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
207                 Stage::Finished(output) => output,
208                 _ => panic!("JoinHandle polled after completion"),
209             }
210         })
211     }
212 
set_stage(&self, stage: Stage<T>)213     unsafe fn set_stage(&self, stage: Stage<T>) {
214         self.stage.with_mut(|ptr| *ptr = stage)
215     }
216 }
217 
218 cfg_rt_multi_thread! {
219     impl Header {
220         pub(super) unsafe fn set_next(&self, next: Option<NonNull<Header>>) {
221             self.queue_next.with_mut(|ptr| *ptr = next);
222         }
223     }
224 }
225 
226 impl Header {
227     // safety: The caller must guarantee exclusive access to this field, and
228     // must ensure that the id is either 0 or the id of the OwnedTasks
229     // containing this task.
set_owner_id(&self, owner: u64)230     pub(super) unsafe fn set_owner_id(&self, owner: u64) {
231         self.owner_id.with_mut(|ptr| *ptr = owner);
232     }
233 
get_owner_id(&self) -> u64234     pub(super) fn get_owner_id(&self) -> u64 {
235         // safety: If there are concurrent writes, then that write has violated
236         // the safety requirements on `set_owner_id`.
237         unsafe { self.owner_id.with(|ptr| *ptr) }
238     }
239 }
240 
241 impl Trailer {
set_waker(&self, waker: Option<Waker>)242     pub(super) unsafe fn set_waker(&self, waker: Option<Waker>) {
243         self.waker.with_mut(|ptr| {
244             *ptr = waker;
245         });
246     }
247 
will_wake(&self, waker: &Waker) -> bool248     pub(super) unsafe fn will_wake(&self, waker: &Waker) -> bool {
249         self.waker
250             .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker))
251     }
252 
wake_join(&self)253     pub(super) fn wake_join(&self) {
254         self.waker.with(|ptr| match unsafe { &*ptr } {
255             Some(waker) => waker.wake_by_ref(),
256             None => panic!("waker missing"),
257         });
258     }
259 }
260 
261 #[test]
262 #[cfg(not(loom))]
header_lte_cache_line()263 fn header_lte_cache_line() {
264     use std::mem::size_of;
265 
266     assert!(size_of::<Header>() <= 8 * size_of::<*const ()>());
267 }
268