1 //! Core task module.
2 //!
3 //! # Safety
4 //!
5 //! The functions in this module are private to the `task` module. All of them
6 //! should be considered `unsafe` to use, but are not marked as such since it
7 //! would be too noisy.
8 //!
9 //! Make sure to consult the relevant safety section of each function before
10 //! use.
11
12 use crate::future::Future;
13 use crate::loom::cell::UnsafeCell;
14 use crate::runtime::context;
15 use crate::runtime::task::raw::{self, Vtable};
16 use crate::runtime::task::state::State;
17 use crate::runtime::task::{Id, Schedule};
18 use crate::util::linked_list;
19
20 use std::num::NonZeroU64;
21 use std::pin::Pin;
22 use std::ptr::NonNull;
23 use std::task::{Context, Poll, Waker};
24
25 /// The task cell. Contains the components of the task.
26 ///
27 /// It is critical for `Header` to be the first field as the task structure will
28 /// be referenced by both *mut Cell and *mut Header.
29 ///
30 /// Any changes to the layout of this struct _must_ also be reflected in the
31 /// const fns in raw.rs.
32 ///
33 // # This struct should be cache padded to avoid false sharing. The cache padding rules are copied
34 // from crossbeam-utils/src/cache_padded.rs
35 //
36 // Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
37 // lines at a time, so we have to align to 128 bytes rather than 64.
38 //
39 // Sources:
40 // - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
41 // - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
42 //
43 // ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
44 //
45 // Sources:
46 // - https://www.mono-project.com/news/2016/09/12/arm64-icache/
47 //
48 // powerpc64 has 128-byte cache line size.
49 //
50 // Sources:
51 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
52 #[cfg_attr(
53 any(
54 target_arch = "x86_64",
55 target_arch = "aarch64",
56 target_arch = "powerpc64",
57 ),
58 repr(align(128))
59 )]
60 // arm, mips, mips64, riscv64, sparc, and hexagon have 32-byte cache line size.
61 //
62 // Sources:
63 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
64 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
65 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
66 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
67 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
68 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L17
69 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/hexagon/include/asm/cache.h#L12
70 //
71 // riscv32 is assumed not to exceed the cache line size of riscv64.
72 #[cfg_attr(
73 any(
74 target_arch = "arm",
75 target_arch = "mips",
76 target_arch = "mips64",
77 target_arch = "riscv32",
78 target_arch = "riscv64",
79 target_arch = "sparc",
80 target_arch = "hexagon",
81 ),
82 repr(align(32))
83 )]
84 // m68k has 16-byte cache line size.
85 //
86 // Sources:
87 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/m68k/include/asm/cache.h#L9
88 #[cfg_attr(target_arch = "m68k", repr(align(16)))]
89 // s390x has 256-byte cache line size.
90 //
91 // Sources:
92 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
93 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/s390/include/asm/cache.h#L13
94 #[cfg_attr(target_arch = "s390x", repr(align(256)))]
95 // x86, wasm, and sparc64 have 64-byte cache line size.
96 //
97 // Sources:
98 // - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
99 // - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
100 // - https://github.com/torvalds/linux/blob/3516bd729358a2a9b090c1905bd2a3fa926e24c6/arch/sparc/include/asm/cache.h#L19
101 //
102 // All others are assumed to have 64-byte cache line size.
103 #[cfg_attr(
104 not(any(
105 target_arch = "x86_64",
106 target_arch = "aarch64",
107 target_arch = "powerpc64",
108 target_arch = "arm",
109 target_arch = "mips",
110 target_arch = "mips64",
111 target_arch = "riscv32",
112 target_arch = "riscv64",
113 target_arch = "sparc",
114 target_arch = "hexagon",
115 target_arch = "m68k",
116 target_arch = "s390x",
117 )),
118 repr(align(64))
119 )]
120 #[repr(C)]
121 pub(super) struct Cell<T: Future, S> {
122 /// Hot task state data
123 pub(super) header: Header,
124
125 /// Either the future or output, depending on the execution stage.
126 pub(super) core: Core<T, S>,
127
128 /// Cold data
129 pub(super) trailer: Trailer,
130 }
131
132 pub(super) struct CoreStage<T: Future> {
133 stage: UnsafeCell<Stage<T>>,
134 }
135
136 /// The core of the task.
137 ///
138 /// Holds the future or output, depending on the stage of execution.
139 ///
140 /// Any changes to the layout of this struct _must_ also be reflected in the
141 /// const fns in raw.rs.
142 #[repr(C)]
143 pub(super) struct Core<T: Future, S> {
144 /// Scheduler used to drive this future.
145 pub(super) scheduler: S,
146
147 /// The task's ID, used for populating `JoinError`s.
148 pub(super) task_id: Id,
149
150 /// Either the future or the output.
151 pub(super) stage: CoreStage<T>,
152 }
153
154 /// Crate public as this is also needed by the pool.
155 #[repr(C)]
156 pub(crate) struct Header {
157 /// Task state.
158 pub(super) state: State,
159
160 /// Pointer to next task, used with the injection queue.
161 pub(super) queue_next: UnsafeCell<Option<NonNull<Header>>>,
162
163 /// Table of function pointers for executing actions on the task.
164 pub(super) vtable: &'static Vtable,
165
166 /// This integer contains the id of the OwnedTasks or LocalOwnedTasks that
167 /// this task is stored in. If the task is not in any list, should be the
168 /// id of the list that it was previously in, or `None` if it has never been
169 /// in any list.
170 ///
171 /// Once a task has been bound to a list, it can never be bound to another
172 /// list, even if removed from the first list.
173 ///
174 /// The id is not unset when removed from a list because we want to be able
175 /// to read the id without synchronization, even if it is concurrently being
176 /// removed from the list.
177 pub(super) owner_id: UnsafeCell<Option<NonZeroU64>>,
178
179 /// The tracing ID for this instrumented task.
180 #[cfg(all(tokio_unstable, feature = "tracing"))]
181 pub(super) tracing_id: Option<tracing::Id>,
182 }
183
184 unsafe impl Send for Header {}
185 unsafe impl Sync for Header {}
186
187 /// Cold data is stored after the future. Data is considered cold if it is only
188 /// used during creation or shutdown of the task.
189 pub(super) struct Trailer {
190 /// Pointers for the linked list in the `OwnedTasks` that owns this task.
191 pub(super) owned: linked_list::Pointers<Header>,
192 /// Consumer task waiting on completion of this task.
193 pub(super) waker: UnsafeCell<Option<Waker>>,
194 }
195
196 generate_addr_of_methods! {
197 impl<> Trailer {
198 pub(super) unsafe fn addr_of_owned(self: NonNull<Self>) -> NonNull<linked_list::Pointers<Header>> {
199 &self.owned
200 }
201 }
202 }
203
204 /// Either the future or the output.
205 pub(super) enum Stage<T: Future> {
206 Running(T),
207 Finished(super::Result<T::Output>),
208 Consumed,
209 }
210
211 impl<T: Future, S: Schedule> Cell<T, S> {
212 /// Allocates a new task cell, containing the header, trailer, and core
213 /// structures.
new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>>214 pub(super) fn new(future: T, scheduler: S, state: State, task_id: Id) -> Box<Cell<T, S>> {
215 // Separated into a non-generic function to reduce LLVM codegen
216 fn new_header(
217 state: State,
218 vtable: &'static Vtable,
219 #[cfg(all(tokio_unstable, feature = "tracing"))] tracing_id: Option<tracing::Id>,
220 ) -> Header {
221 Header {
222 state,
223 queue_next: UnsafeCell::new(None),
224 vtable,
225 owner_id: UnsafeCell::new(None),
226 #[cfg(all(tokio_unstable, feature = "tracing"))]
227 tracing_id,
228 }
229 }
230
231 #[cfg(all(tokio_unstable, feature = "tracing"))]
232 let tracing_id = future.id();
233 let vtable = raw::vtable::<T, S>();
234 let result = Box::new(Cell {
235 header: new_header(
236 state,
237 vtable,
238 #[cfg(all(tokio_unstable, feature = "tracing"))]
239 tracing_id,
240 ),
241 core: Core {
242 scheduler,
243 stage: CoreStage {
244 stage: UnsafeCell::new(Stage::Running(future)),
245 },
246 task_id,
247 },
248 trailer: Trailer::new(),
249 });
250
251 #[cfg(debug_assertions)]
252 {
253 // Using a separate function for this code avoids instantiating it separately for every `T`.
254 unsafe fn check<S>(header: &Header, trailer: &Trailer, scheduler: &S, task_id: &Id) {
255 let trailer_addr = trailer as *const Trailer as usize;
256 let trailer_ptr = unsafe { Header::get_trailer(NonNull::from(header)) };
257 assert_eq!(trailer_addr, trailer_ptr.as_ptr() as usize);
258
259 let scheduler_addr = scheduler as *const S as usize;
260 let scheduler_ptr = unsafe { Header::get_scheduler::<S>(NonNull::from(header)) };
261 assert_eq!(scheduler_addr, scheduler_ptr.as_ptr() as usize);
262
263 let id_addr = task_id as *const Id as usize;
264 let id_ptr = unsafe { Header::get_id_ptr(NonNull::from(header)) };
265 assert_eq!(id_addr, id_ptr.as_ptr() as usize);
266 }
267 unsafe {
268 check(
269 &result.header,
270 &result.trailer,
271 &result.core.scheduler,
272 &result.core.task_id,
273 );
274 }
275 }
276
277 result
278 }
279 }
280
281 impl<T: Future> CoreStage<T> {
with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R282 pub(super) fn with_mut<R>(&self, f: impl FnOnce(*mut Stage<T>) -> R) -> R {
283 self.stage.with_mut(f)
284 }
285 }
286
287 /// Set and clear the task id in the context when the future is executed or
288 /// dropped, or when the output produced by the future is dropped.
289 pub(crate) struct TaskIdGuard {
290 parent_task_id: Option<Id>,
291 }
292
293 impl TaskIdGuard {
enter(id: Id) -> Self294 fn enter(id: Id) -> Self {
295 TaskIdGuard {
296 parent_task_id: context::set_current_task_id(Some(id)),
297 }
298 }
299 }
300
301 impl Drop for TaskIdGuard {
drop(&mut self)302 fn drop(&mut self) {
303 context::set_current_task_id(self.parent_task_id);
304 }
305 }
306
307 impl<T: Future, S: Schedule> Core<T, S> {
308 /// Polls the future.
309 ///
310 /// # Safety
311 ///
312 /// The caller must ensure it is safe to mutate the `state` field. This
313 /// requires ensuring mutual exclusion between any concurrent thread that
314 /// might modify the future or output field.
315 ///
316 /// The mutual exclusion is implemented by `Harness` and the `Lifecycle`
317 /// component of the task state.
318 ///
319 /// `self` must also be pinned. This is handled by storing the task on the
320 /// heap.
poll(&self, mut cx: Context<'_>) -> Poll<T::Output>321 pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
322 let res = {
323 self.stage.stage.with_mut(|ptr| {
324 // Safety: The caller ensures mutual exclusion to the field.
325 let future = match unsafe { &mut *ptr } {
326 Stage::Running(future) => future,
327 _ => unreachable!("unexpected stage"),
328 };
329
330 // Safety: The caller ensures the future is pinned.
331 let future = unsafe { Pin::new_unchecked(future) };
332
333 let _guard = TaskIdGuard::enter(self.task_id);
334 future.poll(&mut cx)
335 })
336 };
337
338 if res.is_ready() {
339 self.drop_future_or_output();
340 }
341
342 res
343 }
344
345 /// Drops the future.
346 ///
347 /// # Safety
348 ///
349 /// The caller must ensure it is safe to mutate the `stage` field.
drop_future_or_output(&self)350 pub(super) fn drop_future_or_output(&self) {
351 // Safety: the caller ensures mutual exclusion to the field.
352 unsafe {
353 self.set_stage(Stage::Consumed);
354 }
355 }
356
357 /// Stores the task output.
358 ///
359 /// # Safety
360 ///
361 /// The caller must ensure it is safe to mutate the `stage` field.
store_output(&self, output: super::Result<T::Output>)362 pub(super) fn store_output(&self, output: super::Result<T::Output>) {
363 // Safety: the caller ensures mutual exclusion to the field.
364 unsafe {
365 self.set_stage(Stage::Finished(output));
366 }
367 }
368
369 /// Takes the task output.
370 ///
371 /// # Safety
372 ///
373 /// The caller must ensure it is safe to mutate the `stage` field.
take_output(&self) -> super::Result<T::Output>374 pub(super) fn take_output(&self) -> super::Result<T::Output> {
375 use std::mem;
376
377 self.stage.stage.with_mut(|ptr| {
378 // Safety:: the caller ensures mutual exclusion to the field.
379 match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
380 Stage::Finished(output) => output,
381 _ => panic!("JoinHandle polled after completion"),
382 }
383 })
384 }
385
set_stage(&self, stage: Stage<T>)386 unsafe fn set_stage(&self, stage: Stage<T>) {
387 let _guard = TaskIdGuard::enter(self.task_id);
388 self.stage.stage.with_mut(|ptr| *ptr = stage)
389 }
390 }
391
392 impl Header {
set_next(&self, next: Option<NonNull<Header>>)393 pub(super) unsafe fn set_next(&self, next: Option<NonNull<Header>>) {
394 self.queue_next.with_mut(|ptr| *ptr = next);
395 }
396
397 // safety: The caller must guarantee exclusive access to this field, and
398 // must ensure that the id is either `None` or the id of the OwnedTasks
399 // containing this task.
set_owner_id(&self, owner: NonZeroU64)400 pub(super) unsafe fn set_owner_id(&self, owner: NonZeroU64) {
401 self.owner_id.with_mut(|ptr| *ptr = Some(owner));
402 }
403
get_owner_id(&self) -> Option<NonZeroU64>404 pub(super) fn get_owner_id(&self) -> Option<NonZeroU64> {
405 // safety: If there are concurrent writes, then that write has violated
406 // the safety requirements on `set_owner_id`.
407 unsafe { self.owner_id.with(|ptr| *ptr) }
408 }
409
410 /// Gets a pointer to the `Trailer` of the task containing this `Header`.
411 ///
412 /// # Safety
413 ///
414 /// The provided raw pointer must point at the header of a task.
get_trailer(me: NonNull<Header>) -> NonNull<Trailer>415 pub(super) unsafe fn get_trailer(me: NonNull<Header>) -> NonNull<Trailer> {
416 let offset = me.as_ref().vtable.trailer_offset;
417 let trailer = me.as_ptr().cast::<u8>().add(offset).cast::<Trailer>();
418 NonNull::new_unchecked(trailer)
419 }
420
421 /// Gets a pointer to the scheduler of the task containing this `Header`.
422 ///
423 /// # Safety
424 ///
425 /// The provided raw pointer must point at the header of a task.
426 ///
427 /// The generic type S must be set to the correct scheduler type for this
428 /// task.
get_scheduler<S>(me: NonNull<Header>) -> NonNull<S>429 pub(super) unsafe fn get_scheduler<S>(me: NonNull<Header>) -> NonNull<S> {
430 let offset = me.as_ref().vtable.scheduler_offset;
431 let scheduler = me.as_ptr().cast::<u8>().add(offset).cast::<S>();
432 NonNull::new_unchecked(scheduler)
433 }
434
435 /// Gets a pointer to the id of the task containing this `Header`.
436 ///
437 /// # Safety
438 ///
439 /// The provided raw pointer must point at the header of a task.
get_id_ptr(me: NonNull<Header>) -> NonNull<Id>440 pub(super) unsafe fn get_id_ptr(me: NonNull<Header>) -> NonNull<Id> {
441 let offset = me.as_ref().vtable.id_offset;
442 let id = me.as_ptr().cast::<u8>().add(offset).cast::<Id>();
443 NonNull::new_unchecked(id)
444 }
445
446 /// Gets the id of the task containing this `Header`.
447 ///
448 /// # Safety
449 ///
450 /// The provided raw pointer must point at the header of a task.
get_id(me: NonNull<Header>) -> Id451 pub(super) unsafe fn get_id(me: NonNull<Header>) -> Id {
452 let ptr = Header::get_id_ptr(me).as_ptr();
453 *ptr
454 }
455
456 /// Gets the tracing id of the task containing this `Header`.
457 ///
458 /// # Safety
459 ///
460 /// The provided raw pointer must point at the header of a task.
461 #[cfg(all(tokio_unstable, feature = "tracing"))]
get_tracing_id(me: &NonNull<Header>) -> Option<&tracing::Id>462 pub(super) unsafe fn get_tracing_id(me: &NonNull<Header>) -> Option<&tracing::Id> {
463 me.as_ref().tracing_id.as_ref()
464 }
465 }
466
467 impl Trailer {
new() -> Self468 fn new() -> Self {
469 Trailer {
470 waker: UnsafeCell::new(None),
471 owned: linked_list::Pointers::new(),
472 }
473 }
474
set_waker(&self, waker: Option<Waker>)475 pub(super) unsafe fn set_waker(&self, waker: Option<Waker>) {
476 self.waker.with_mut(|ptr| {
477 *ptr = waker;
478 });
479 }
480
will_wake(&self, waker: &Waker) -> bool481 pub(super) unsafe fn will_wake(&self, waker: &Waker) -> bool {
482 self.waker
483 .with(|ptr| (*ptr).as_ref().unwrap().will_wake(waker))
484 }
485
wake_join(&self)486 pub(super) fn wake_join(&self) {
487 self.waker.with(|ptr| match unsafe { &*ptr } {
488 Some(waker) => waker.wake_by_ref(),
489 None => panic!("waker missing"),
490 });
491 }
492 }
493
494 #[test]
495 #[cfg(not(loom))]
header_lte_cache_line()496 fn header_lte_cache_line() {
497 use std::mem::size_of;
498
499 assert!(size_of::<Header>() <= 8 * size_of::<*const ()>());
500 }
501