• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use alloc::alloc::Layout as StdLayout;
2 use core::cell::UnsafeCell;
3 use core::future::Future;
4 use core::mem::{self, ManuallyDrop};
5 use core::pin::Pin;
6 use core::ptr::NonNull;
7 use core::sync::atomic::{AtomicUsize, Ordering};
8 use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
9 
10 use crate::header::Header;
11 use crate::state::*;
12 use crate::utils::{abort, abort_on_panic, max, Layout};
13 use crate::Runnable;
14 
15 /// The vtable for a task.
16 pub(crate) struct TaskVTable {
17     /// Schedules the task.
18     pub(crate) schedule: unsafe fn(*const ()),
19 
20     /// Drops the future inside the task.
21     pub(crate) drop_future: unsafe fn(*const ()),
22 
23     /// Returns a pointer to the output stored after completion.
24     pub(crate) get_output: unsafe fn(*const ()) -> *const (),
25 
26     /// Drops the task reference (`Runnable` or `Waker`).
27     pub(crate) drop_ref: unsafe fn(ptr: *const ()),
28 
29     /// Destroys the task.
30     pub(crate) destroy: unsafe fn(*const ()),
31 
32     /// Runs the task.
33     pub(crate) run: unsafe fn(*const ()) -> bool,
34 
35     /// Creates a new waker associated with the task.
36     pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
37 
38     /// The memory layout of the task. This information enables
39     /// debuggers to decode raw task memory blobs. Do not remove
40     /// the field, even if it appears to be unused.
41     #[allow(unused)]
42     pub(crate) layout_info: &'static Option<TaskLayout>,
43 }
44 
45 /// Memory layout of a task.
46 ///
47 /// This struct contains the following information:
48 ///
49 /// 1. How to allocate and deallocate the task.
50 /// 2. How to access the fields inside the task.
51 #[derive(Clone, Copy)]
52 pub(crate) struct TaskLayout {
53     /// Memory layout of the whole task.
54     pub(crate) layout: StdLayout,
55 
56     /// Offset into the task at which the schedule function is stored.
57     pub(crate) offset_s: usize,
58 
59     /// Offset into the task at which the future is stored.
60     pub(crate) offset_f: usize,
61 
62     /// Offset into the task at which the output is stored.
63     pub(crate) offset_r: usize,
64 }
65 
66 /// Raw pointers to the fields inside a task.
67 pub(crate) struct RawTask<F, T, S> {
68     /// The task header.
69     pub(crate) header: *const Header,
70 
71     /// The schedule function.
72     pub(crate) schedule: *const S,
73 
74     /// The future.
75     pub(crate) future: *mut F,
76 
77     /// The output of the future.
78     pub(crate) output: *mut T,
79 }
80 
81 impl<F, T, S> Copy for RawTask<F, T, S> {}
82 
83 impl<F, T, S> Clone for RawTask<F, T, S> {
clone(&self) -> Self84     fn clone(&self) -> Self {
85         *self
86     }
87 }
88 
89 impl<F, T, S> RawTask<F, T, S> {
90     const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout();
91 
92     /// Computes the memory layout for a task.
93     #[inline]
eval_task_layout() -> Option<TaskLayout>94     const fn eval_task_layout() -> Option<TaskLayout> {
95         // Compute the layouts for `Header`, `S`, `F`, and `T`.
96         let layout_header = Layout::new::<Header>();
97         let layout_s = Layout::new::<S>();
98         let layout_f = Layout::new::<F>();
99         let layout_r = Layout::new::<T>();
100 
101         // Compute the layout for `union { F, T }`.
102         let size_union = max(layout_f.size(), layout_r.size());
103         let align_union = max(layout_f.align(), layout_r.align());
104         let layout_union = Layout::from_size_align(size_union, align_union);
105 
106         // Compute the layout for `Header` followed `S` and `union { F, T }`.
107         let layout = layout_header;
108         let (layout, offset_s) = leap!(layout.extend(layout_s));
109         let (layout, offset_union) = leap!(layout.extend(layout_union));
110         let offset_f = offset_union;
111         let offset_r = offset_union;
112 
113         Some(TaskLayout {
114             layout: unsafe { layout.into_std() },
115             offset_s,
116             offset_f,
117             offset_r,
118         })
119     }
120 }
121 
122 impl<F, T, S> RawTask<F, T, S>
123 where
124     F: Future<Output = T>,
125     S: Fn(Runnable),
126 {
127     const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
128         Self::clone_waker,
129         Self::wake,
130         Self::wake_by_ref,
131         Self::drop_waker,
132     );
133 
134     /// Allocates a task with the given `future` and `schedule` function.
135     ///
136     /// It is assumed that initially only the `Runnable` and the `Task` exist.
allocate(future: F, schedule: S) -> NonNull<()>137     pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> {
138         // Compute the layout of the task for allocation. Abort if the computation fails.
139         //
140         // n.b. notgull: task_layout now automatically aborts instead of panicking
141         let task_layout = Self::task_layout();
142 
143         unsafe {
144             // Allocate enough space for the entire task.
145             let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
146                 None => abort(),
147                 Some(p) => p,
148             };
149 
150             let raw = Self::from_ptr(ptr.as_ptr());
151 
152             // Write the header as the first field of the task.
153             (raw.header as *mut Header).write(Header {
154                 state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
155                 awaiter: UnsafeCell::new(None),
156                 vtable: &TaskVTable {
157                     schedule: Self::schedule,
158                     drop_future: Self::drop_future,
159                     get_output: Self::get_output,
160                     drop_ref: Self::drop_ref,
161                     destroy: Self::destroy,
162                     run: Self::run,
163                     clone_waker: Self::clone_waker,
164                     layout_info: &Self::TASK_LAYOUT,
165                 },
166             });
167 
168             // Write the schedule function as the third field of the task.
169             (raw.schedule as *mut S).write(schedule);
170 
171             // Write the future as the fourth field of the task.
172             raw.future.write(future);
173 
174             ptr
175         }
176     }
177 
178     /// Creates a `RawTask` from a raw task pointer.
179     #[inline]
from_ptr(ptr: *const ()) -> Self180     pub(crate) fn from_ptr(ptr: *const ()) -> Self {
181         let task_layout = Self::task_layout();
182         let p = ptr as *const u8;
183 
184         unsafe {
185             Self {
186                 header: p as *const Header,
187                 schedule: p.add(task_layout.offset_s) as *const S,
188                 future: p.add(task_layout.offset_f) as *mut F,
189                 output: p.add(task_layout.offset_r) as *mut T,
190             }
191         }
192     }
193 
194     /// Returns the layout of the task.
195     #[inline]
task_layout() -> TaskLayout196     fn task_layout() -> TaskLayout {
197         match Self::TASK_LAYOUT {
198             Some(tl) => tl,
199             None => abort(),
200         }
201     }
202 
203     /// Wakes a waker.
wake(ptr: *const ())204     unsafe fn wake(ptr: *const ()) {
205         // This is just an optimization. If the schedule function has captured variables, then
206         // we'll do less reference counting if we wake the waker by reference and then drop it.
207         if mem::size_of::<S>() > 0 {
208             Self::wake_by_ref(ptr);
209             Self::drop_waker(ptr);
210             return;
211         }
212 
213         let raw = Self::from_ptr(ptr);
214 
215         let mut state = (*raw.header).state.load(Ordering::Acquire);
216 
217         loop {
218             // If the task is completed or closed, it can't be woken up.
219             if state & (COMPLETED | CLOSED) != 0 {
220                 // Drop the waker.
221                 Self::drop_waker(ptr);
222                 break;
223             }
224 
225             // If the task is already scheduled, we just need to synchronize with the thread that
226             // will run the task by "publishing" our current view of the memory.
227             if state & SCHEDULED != 0 {
228                 // Update the state without actually modifying it.
229                 match (*raw.header).state.compare_exchange_weak(
230                     state,
231                     state,
232                     Ordering::AcqRel,
233                     Ordering::Acquire,
234                 ) {
235                     Ok(_) => {
236                         // Drop the waker.
237                         Self::drop_waker(ptr);
238                         break;
239                     }
240                     Err(s) => state = s,
241                 }
242             } else {
243                 // Mark the task as scheduled.
244                 match (*raw.header).state.compare_exchange_weak(
245                     state,
246                     state | SCHEDULED,
247                     Ordering::AcqRel,
248                     Ordering::Acquire,
249                 ) {
250                     Ok(_) => {
251                         // If the task is not yet scheduled and isn't currently running, now is the
252                         // time to schedule it.
253                         if state & RUNNING == 0 {
254                             // Schedule the task.
255                             Self::schedule(ptr);
256                         } else {
257                             // Drop the waker.
258                             Self::drop_waker(ptr);
259                         }
260 
261                         break;
262                     }
263                     Err(s) => state = s,
264                 }
265             }
266         }
267     }
268 
269     /// Wakes a waker by reference.
wake_by_ref(ptr: *const ())270     unsafe fn wake_by_ref(ptr: *const ()) {
271         let raw = Self::from_ptr(ptr);
272 
273         let mut state = (*raw.header).state.load(Ordering::Acquire);
274 
275         loop {
276             // If the task is completed or closed, it can't be woken up.
277             if state & (COMPLETED | CLOSED) != 0 {
278                 break;
279             }
280 
281             // If the task is already scheduled, we just need to synchronize with the thread that
282             // will run the task by "publishing" our current view of the memory.
283             if state & SCHEDULED != 0 {
284                 // Update the state without actually modifying it.
285                 match (*raw.header).state.compare_exchange_weak(
286                     state,
287                     state,
288                     Ordering::AcqRel,
289                     Ordering::Acquire,
290                 ) {
291                     Ok(_) => break,
292                     Err(s) => state = s,
293                 }
294             } else {
295                 // If the task is not running, we can schedule right away.
296                 let new = if state & RUNNING == 0 {
297                     (state | SCHEDULED) + REFERENCE
298                 } else {
299                     state | SCHEDULED
300                 };
301 
302                 // Mark the task as scheduled.
303                 match (*raw.header).state.compare_exchange_weak(
304                     state,
305                     new,
306                     Ordering::AcqRel,
307                     Ordering::Acquire,
308                 ) {
309                     Ok(_) => {
310                         // If the task is not running, now is the time to schedule.
311                         if state & RUNNING == 0 {
312                             // If the reference count overflowed, abort.
313                             if state > isize::max_value() as usize {
314                                 abort();
315                             }
316 
317                             // Schedule the task. There is no need to call `Self::schedule(ptr)`
318                             // because the schedule function cannot be destroyed while the waker is
319                             // still alive.
320                             let task = Runnable {
321                                 ptr: NonNull::new_unchecked(ptr as *mut ()),
322                             };
323                             (*raw.schedule)(task);
324                         }
325 
326                         break;
327                     }
328                     Err(s) => state = s,
329                 }
330             }
331         }
332     }
333 
334     /// Clones a waker.
clone_waker(ptr: *const ()) -> RawWaker335     unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
336         let raw = Self::from_ptr(ptr);
337 
338         // Increment the reference count. With any kind of reference-counted data structure,
339         // relaxed ordering is appropriate when incrementing the counter.
340         let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
341 
342         // If the reference count overflowed, abort.
343         if state > isize::max_value() as usize {
344             abort();
345         }
346 
347         RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
348     }
349 
350     /// Drops a waker.
351     ///
352     /// This function will decrement the reference count. If it drops down to zero, the associated
353     /// `Task` has been dropped too, and the task has not been completed, then it will get
354     /// scheduled one more time so that its future gets dropped by the executor.
355     #[inline]
drop_waker(ptr: *const ())356     unsafe fn drop_waker(ptr: *const ()) {
357         let raw = Self::from_ptr(ptr);
358 
359         // Decrement the reference count.
360         let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
361 
362         // If this was the last reference to the task and the `Task` has been dropped too,
363         // then we need to decide how to destroy the task.
364         if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
365             if new & (COMPLETED | CLOSED) == 0 {
366                 // If the task was not completed nor closed, close it and schedule one more time so
367                 // that its future gets dropped by the executor.
368                 (*raw.header)
369                     .state
370                     .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
371                 Self::schedule(ptr);
372             } else {
373                 // Otherwise, destroy the task right away.
374                 Self::destroy(ptr);
375             }
376         }
377     }
378 
379     /// Drops a task reference (`Runnable` or `Waker`).
380     ///
381     /// This function will decrement the reference count. If it drops down to zero and the
382     /// associated `Task` handle has been dropped too, then the task gets destroyed.
383     #[inline]
drop_ref(ptr: *const ())384     unsafe fn drop_ref(ptr: *const ()) {
385         let raw = Self::from_ptr(ptr);
386 
387         // Decrement the reference count.
388         let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
389 
390         // If this was the last reference to the task and the `Task` has been dropped too,
391         // then destroy the task.
392         if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
393             Self::destroy(ptr);
394         }
395     }
396 
397     /// Schedules a task for running.
398     ///
399     /// This function doesn't modify the state of the task. It only passes the task reference to
400     /// its schedule function.
schedule(ptr: *const ())401     unsafe fn schedule(ptr: *const ()) {
402         let raw = Self::from_ptr(ptr);
403 
404         // If the schedule function has captured variables, create a temporary waker that prevents
405         // the task from getting deallocated while the function is being invoked.
406         let _waker;
407         if mem::size_of::<S>() > 0 {
408             _waker = Waker::from_raw(Self::clone_waker(ptr));
409         }
410 
411         let task = Runnable {
412             ptr: NonNull::new_unchecked(ptr as *mut ()),
413         };
414         (*raw.schedule)(task);
415     }
416 
417     /// Drops the future inside a task.
418     #[inline]
drop_future(ptr: *const ())419     unsafe fn drop_future(ptr: *const ()) {
420         let raw = Self::from_ptr(ptr);
421 
422         // We need a safeguard against panics because the destructor can panic.
423         abort_on_panic(|| {
424             raw.future.drop_in_place();
425         })
426     }
427 
428     /// Returns a pointer to the output inside a task.
get_output(ptr: *const ()) -> *const ()429     unsafe fn get_output(ptr: *const ()) -> *const () {
430         let raw = Self::from_ptr(ptr);
431         raw.output as *const ()
432     }
433 
434     /// Cleans up task's resources and deallocates it.
435     ///
436     /// The schedule function will be dropped, and the task will then get deallocated.
437     /// The task must be closed before this function is called.
438     #[inline]
destroy(ptr: *const ())439     unsafe fn destroy(ptr: *const ()) {
440         let raw = Self::from_ptr(ptr);
441         let task_layout = Self::task_layout();
442 
443         // We need a safeguard against panics because destructors can panic.
444         abort_on_panic(|| {
445             // Drop the schedule function.
446             (raw.schedule as *mut S).drop_in_place();
447         });
448 
449         // Finally, deallocate the memory reserved by the task.
450         alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
451     }
452 
453     /// Runs a task.
454     ///
455     /// If polling its future panics, the task will be closed and the panic will be propagated into
456     /// the caller.
run(ptr: *const ()) -> bool457     unsafe fn run(ptr: *const ()) -> bool {
458         let raw = Self::from_ptr(ptr);
459 
460         // Create a context from the raw task pointer and the vtable inside the its header.
461         let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
462         let cx = &mut Context::from_waker(&waker);
463 
464         let mut state = (*raw.header).state.load(Ordering::Acquire);
465 
466         // Update the task's state before polling its future.
467         loop {
468             // If the task has already been closed, drop the task reference and return.
469             if state & CLOSED != 0 {
470                 // Drop the future.
471                 Self::drop_future(ptr);
472 
473                 // Mark the task as unscheduled.
474                 let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
475 
476                 // Take the awaiter out.
477                 let mut awaiter = None;
478                 if state & AWAITER != 0 {
479                     awaiter = (*raw.header).take(None);
480                 }
481 
482                 // Drop the task reference.
483                 Self::drop_ref(ptr);
484 
485                 // Notify the awaiter that the future has been dropped.
486                 if let Some(w) = awaiter {
487                     abort_on_panic(|| w.wake());
488                 }
489                 return false;
490             }
491 
492             // Mark the task as unscheduled and running.
493             match (*raw.header).state.compare_exchange_weak(
494                 state,
495                 (state & !SCHEDULED) | RUNNING,
496                 Ordering::AcqRel,
497                 Ordering::Acquire,
498             ) {
499                 Ok(_) => {
500                     // Update the state because we're continuing with polling the future.
501                     state = (state & !SCHEDULED) | RUNNING;
502                     break;
503                 }
504                 Err(s) => state = s,
505             }
506         }
507 
508         // Poll the inner future, but surround it with a guard that closes the task in case polling
509         // panics.
510         let guard = Guard(raw);
511         let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
512         mem::forget(guard);
513 
514         match poll {
515             Poll::Ready(out) => {
516                 // Replace the future with its output.
517                 Self::drop_future(ptr);
518                 raw.output.write(out);
519 
520                 // The task is now completed.
521                 loop {
522                     // If the `Task` is dropped, we'll need to close it and drop the output.
523                     let new = if state & TASK == 0 {
524                         (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
525                     } else {
526                         (state & !RUNNING & !SCHEDULED) | COMPLETED
527                     };
528 
529                     // Mark the task as not running and completed.
530                     match (*raw.header).state.compare_exchange_weak(
531                         state,
532                         new,
533                         Ordering::AcqRel,
534                         Ordering::Acquire,
535                     ) {
536                         Ok(_) => {
537                             // If the `Task` is dropped or if the task was closed while running,
538                             // now it's time to drop the output.
539                             if state & TASK == 0 || state & CLOSED != 0 {
540                                 // Drop the output.
541                                 abort_on_panic(|| raw.output.drop_in_place());
542                             }
543 
544                             // Take the awaiter out.
545                             let mut awaiter = None;
546                             if state & AWAITER != 0 {
547                                 awaiter = (*raw.header).take(None);
548                             }
549 
550                             // Drop the task reference.
551                             Self::drop_ref(ptr);
552 
553                             // Notify the awaiter that the future has been dropped.
554                             if let Some(w) = awaiter {
555                                 abort_on_panic(|| w.wake());
556                             }
557                             break;
558                         }
559                         Err(s) => state = s,
560                     }
561                 }
562             }
563             Poll::Pending => {
564                 let mut future_dropped = false;
565 
566                 // The task is still not completed.
567                 loop {
568                     // If the task was closed while running, we'll need to unschedule in case it
569                     // was woken up and then destroy it.
570                     let new = if state & CLOSED != 0 {
571                         state & !RUNNING & !SCHEDULED
572                     } else {
573                         state & !RUNNING
574                     };
575 
576                     if state & CLOSED != 0 && !future_dropped {
577                         // The thread that closed the task didn't drop the future because it was
578                         // running so now it's our responsibility to do so.
579                         Self::drop_future(ptr);
580                         future_dropped = true;
581                     }
582 
583                     // Mark the task as not running.
584                     match (*raw.header).state.compare_exchange_weak(
585                         state,
586                         new,
587                         Ordering::AcqRel,
588                         Ordering::Acquire,
589                     ) {
590                         Ok(state) => {
591                             // If the task was closed while running, we need to notify the awaiter.
592                             // If the task was woken up while running, we need to schedule it.
593                             // Otherwise, we just drop the task reference.
594                             if state & CLOSED != 0 {
595                                 // Take the awaiter out.
596                                 let mut awaiter = None;
597                                 if state & AWAITER != 0 {
598                                     awaiter = (*raw.header).take(None);
599                                 }
600 
601                                 // Drop the task reference.
602                                 Self::drop_ref(ptr);
603 
604                                 // Notify the awaiter that the future has been dropped.
605                                 if let Some(w) = awaiter {
606                                     abort_on_panic(|| w.wake());
607                                 }
608                             } else if state & SCHEDULED != 0 {
609                                 // The thread that woke the task up didn't reschedule it because
610                                 // it was running so now it's our responsibility to do so.
611                                 Self::schedule(ptr);
612                                 return true;
613                             } else {
614                                 // Drop the task reference.
615                                 Self::drop_ref(ptr);
616                             }
617                             break;
618                         }
619                         Err(s) => state = s,
620                     }
621                 }
622             }
623         }
624 
625         return false;
626 
627         /// A guard that closes the task if polling its future panics.
628         struct Guard<F, T, S>(RawTask<F, T, S>)
629         where
630             F: Future<Output = T>,
631             S: Fn(Runnable);
632 
633         impl<F, T, S> Drop for Guard<F, T, S>
634         where
635             F: Future<Output = T>,
636             S: Fn(Runnable),
637         {
638             fn drop(&mut self) {
639                 let raw = self.0;
640                 let ptr = raw.header as *const ();
641 
642                 unsafe {
643                     let mut state = (*raw.header).state.load(Ordering::Acquire);
644 
645                     loop {
646                         // If the task was closed while running, then unschedule it, drop its
647                         // future, and drop the task reference.
648                         if state & CLOSED != 0 {
649                             // The thread that closed the task didn't drop the future because it
650                             // was running so now it's our responsibility to do so.
651                             RawTask::<F, T, S>::drop_future(ptr);
652 
653                             // Mark the task as not running and not scheduled.
654                             (*raw.header)
655                                 .state
656                                 .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
657 
658                             // Take the awaiter out.
659                             let mut awaiter = None;
660                             if state & AWAITER != 0 {
661                                 awaiter = (*raw.header).take(None);
662                             }
663 
664                             // Drop the task reference.
665                             RawTask::<F, T, S>::drop_ref(ptr);
666 
667                             // Notify the awaiter that the future has been dropped.
668                             if let Some(w) = awaiter {
669                                 abort_on_panic(|| w.wake());
670                             }
671                             break;
672                         }
673 
674                         // Mark the task as not running, not scheduled, and closed.
675                         match (*raw.header).state.compare_exchange_weak(
676                             state,
677                             (state & !RUNNING & !SCHEDULED) | CLOSED,
678                             Ordering::AcqRel,
679                             Ordering::Acquire,
680                         ) {
681                             Ok(state) => {
682                                 // Drop the future because the task is now closed.
683                                 RawTask::<F, T, S>::drop_future(ptr);
684 
685                                 // Take the awaiter out.
686                                 let mut awaiter = None;
687                                 if state & AWAITER != 0 {
688                                     awaiter = (*raw.header).take(None);
689                                 }
690 
691                                 // Drop the task reference.
692                                 RawTask::<F, T, S>::drop_ref(ptr);
693 
694                                 // Notify the awaiter that the future has been dropped.
695                                 if let Some(w) = awaiter {
696                                     abort_on_panic(|| w.wake());
697                                 }
698                                 break;
699                             }
700                             Err(s) => state = s,
701                         }
702                     }
703                 }
704             }
705         }
706     }
707 }
708