• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use alloc::alloc::Layout;
2 use core::cell::UnsafeCell;
3 use core::future::Future;
4 use core::mem::{self, ManuallyDrop};
5 use core::pin::Pin;
6 use core::ptr::NonNull;
7 use core::sync::atomic::{AtomicUsize, Ordering};
8 use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
9 
10 use crate::header::Header;
11 use crate::state::*;
12 use crate::utils::{abort, abort_on_panic, extend};
13 use crate::Runnable;
14 
15 /// The vtable for a task.
16 pub(crate) struct TaskVTable {
17     /// Schedules the task.
18     pub(crate) schedule: unsafe fn(*const ()),
19 
20     /// Drops the future inside the task.
21     pub(crate) drop_future: unsafe fn(*const ()),
22 
23     /// Returns a pointer to the output stored after completion.
24     pub(crate) get_output: unsafe fn(*const ()) -> *const (),
25 
26     /// Drops the task reference (`Runnable` or `Waker`).
27     pub(crate) drop_ref: unsafe fn(ptr: *const ()),
28 
29     /// Destroys the task.
30     pub(crate) destroy: unsafe fn(*const ()),
31 
32     /// Runs the task.
33     pub(crate) run: unsafe fn(*const ()) -> bool,
34 
35     /// Creates a new waker associated with the task.
36     pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
37 }
38 
39 /// Memory layout of a task.
40 ///
41 /// This struct contains the following information:
42 ///
43 /// 1. How to allocate and deallocate the task.
44 /// 2. How to access the fields inside the task.
45 #[derive(Clone, Copy)]
46 pub(crate) struct TaskLayout {
47     /// Memory layout of the whole task.
48     pub(crate) layout: Layout,
49 
50     /// Offset into the task at which the schedule function is stored.
51     pub(crate) offset_s: usize,
52 
53     /// Offset into the task at which the future is stored.
54     pub(crate) offset_f: usize,
55 
56     /// Offset into the task at which the output is stored.
57     pub(crate) offset_r: usize,
58 }
59 
60 /// Raw pointers to the fields inside a task.
61 pub(crate) struct RawTask<F, T, S> {
62     /// The task header.
63     pub(crate) header: *const Header,
64 
65     /// The schedule function.
66     pub(crate) schedule: *const S,
67 
68     /// The future.
69     pub(crate) future: *mut F,
70 
71     /// The output of the future.
72     pub(crate) output: *mut T,
73 }
74 
75 impl<F, T, S> Copy for RawTask<F, T, S> {}
76 
77 impl<F, T, S> Clone for RawTask<F, T, S> {
clone(&self) -> Self78     fn clone(&self) -> Self {
79         *self
80     }
81 }
82 
83 impl<F, T, S> RawTask<F, T, S>
84 where
85     F: Future<Output = T>,
86     S: Fn(Runnable),
87 {
88     const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
89         Self::clone_waker,
90         Self::wake,
91         Self::wake_by_ref,
92         Self::drop_waker,
93     );
94 
95     /// Allocates a task with the given `future` and `schedule` function.
96     ///
97     /// It is assumed that initially only the `Runnable` and the `Task` exist.
allocate(future: F, schedule: S) -> NonNull<()>98     pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> {
99         // Compute the layout of the task for allocation. Abort if the computation fails.
100         let task_layout = abort_on_panic(|| Self::task_layout());
101 
102         unsafe {
103             // Allocate enough space for the entire task.
104             let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
105                 None => abort(),
106                 Some(p) => p,
107             };
108 
109             let raw = Self::from_ptr(ptr.as_ptr());
110 
111             // Write the header as the first field of the task.
112             (raw.header as *mut Header).write(Header {
113                 state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
114                 awaiter: UnsafeCell::new(None),
115                 vtable: &TaskVTable {
116                     schedule: Self::schedule,
117                     drop_future: Self::drop_future,
118                     get_output: Self::get_output,
119                     drop_ref: Self::drop_ref,
120                     destroy: Self::destroy,
121                     run: Self::run,
122                     clone_waker: Self::clone_waker,
123                 },
124             });
125 
126             // Write the schedule function as the third field of the task.
127             (raw.schedule as *mut S).write(schedule);
128 
129             // Write the future as the fourth field of the task.
130             raw.future.write(future);
131 
132             ptr
133         }
134     }
135 
136     /// Creates a `RawTask` from a raw task pointer.
137     #[inline]
from_ptr(ptr: *const ()) -> Self138     pub(crate) fn from_ptr(ptr: *const ()) -> Self {
139         let task_layout = Self::task_layout();
140         let p = ptr as *const u8;
141 
142         unsafe {
143             Self {
144                 header: p as *const Header,
145                 schedule: p.add(task_layout.offset_s) as *const S,
146                 future: p.add(task_layout.offset_f) as *mut F,
147                 output: p.add(task_layout.offset_r) as *mut T,
148             }
149         }
150     }
151 
152     /// Returns the memory layout for a task.
153     #[inline]
task_layout() -> TaskLayout154     fn task_layout() -> TaskLayout {
155         // Compute the layouts for `Header`, `S`, `F`, and `T`.
156         let layout_header = Layout::new::<Header>();
157         let layout_s = Layout::new::<S>();
158         let layout_f = Layout::new::<F>();
159         let layout_r = Layout::new::<T>();
160 
161         // Compute the layout for `union { F, T }`.
162         let size_union = layout_f.size().max(layout_r.size());
163         let align_union = layout_f.align().max(layout_r.align());
164         let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
165 
166         // Compute the layout for `Header` followed `S` and `union { F, T }`.
167         let layout = layout_header;
168         let (layout, offset_s) = extend(layout, layout_s);
169         let (layout, offset_union) = extend(layout, layout_union);
170         let offset_f = offset_union;
171         let offset_r = offset_union;
172 
173         TaskLayout {
174             layout,
175             offset_s,
176             offset_f,
177             offset_r,
178         }
179     }
180 
181     /// Wakes a waker.
wake(ptr: *const ())182     unsafe fn wake(ptr: *const ()) {
183         // This is just an optimization. If the schedule function has captured variables, then
184         // we'll do less reference counting if we wake the waker by reference and then drop it.
185         if mem::size_of::<S>() > 0 {
186             Self::wake_by_ref(ptr);
187             Self::drop_waker(ptr);
188             return;
189         }
190 
191         let raw = Self::from_ptr(ptr);
192 
193         let mut state = (*raw.header).state.load(Ordering::Acquire);
194 
195         loop {
196             // If the task is completed or closed, it can't be woken up.
197             if state & (COMPLETED | CLOSED) != 0 {
198                 // Drop the waker.
199                 Self::drop_waker(ptr);
200                 break;
201             }
202 
203             // If the task is already scheduled, we just need to synchronize with the thread that
204             // will run the task by "publishing" our current view of the memory.
205             if state & SCHEDULED != 0 {
206                 // Update the state without actually modifying it.
207                 match (*raw.header).state.compare_exchange_weak(
208                     state,
209                     state,
210                     Ordering::AcqRel,
211                     Ordering::Acquire,
212                 ) {
213                     Ok(_) => {
214                         // Drop the waker.
215                         Self::drop_waker(ptr);
216                         break;
217                     }
218                     Err(s) => state = s,
219                 }
220             } else {
221                 // Mark the task as scheduled.
222                 match (*raw.header).state.compare_exchange_weak(
223                     state,
224                     state | SCHEDULED,
225                     Ordering::AcqRel,
226                     Ordering::Acquire,
227                 ) {
228                     Ok(_) => {
229                         // If the task is not yet scheduled and isn't currently running, now is the
230                         // time to schedule it.
231                         if state & RUNNING == 0 {
232                             // Schedule the task.
233                             Self::schedule(ptr);
234                         } else {
235                             // Drop the waker.
236                             Self::drop_waker(ptr);
237                         }
238 
239                         break;
240                     }
241                     Err(s) => state = s,
242                 }
243             }
244         }
245     }
246 
247     /// Wakes a waker by reference.
wake_by_ref(ptr: *const ())248     unsafe fn wake_by_ref(ptr: *const ()) {
249         let raw = Self::from_ptr(ptr);
250 
251         let mut state = (*raw.header).state.load(Ordering::Acquire);
252 
253         loop {
254             // If the task is completed or closed, it can't be woken up.
255             if state & (COMPLETED | CLOSED) != 0 {
256                 break;
257             }
258 
259             // If the task is already scheduled, we just need to synchronize with the thread that
260             // will run the task by "publishing" our current view of the memory.
261             if state & SCHEDULED != 0 {
262                 // Update the state without actually modifying it.
263                 match (*raw.header).state.compare_exchange_weak(
264                     state,
265                     state,
266                     Ordering::AcqRel,
267                     Ordering::Acquire,
268                 ) {
269                     Ok(_) => break,
270                     Err(s) => state = s,
271                 }
272             } else {
273                 // If the task is not running, we can schedule right away.
274                 let new = if state & RUNNING == 0 {
275                     (state | SCHEDULED) + REFERENCE
276                 } else {
277                     state | SCHEDULED
278                 };
279 
280                 // Mark the task as scheduled.
281                 match (*raw.header).state.compare_exchange_weak(
282                     state,
283                     new,
284                     Ordering::AcqRel,
285                     Ordering::Acquire,
286                 ) {
287                     Ok(_) => {
288                         // If the task is not running, now is the time to schedule.
289                         if state & RUNNING == 0 {
290                             // If the reference count overflowed, abort.
291                             if state > isize::max_value() as usize {
292                                 abort();
293                             }
294 
295                             // Schedule the task. There is no need to call `Self::schedule(ptr)`
296                             // because the schedule function cannot be destroyed while the waker is
297                             // still alive.
298                             let task = Runnable {
299                                 ptr: NonNull::new_unchecked(ptr as *mut ()),
300                             };
301                             (*raw.schedule)(task);
302                         }
303 
304                         break;
305                     }
306                     Err(s) => state = s,
307                 }
308             }
309         }
310     }
311 
312     /// Clones a waker.
clone_waker(ptr: *const ()) -> RawWaker313     unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
314         let raw = Self::from_ptr(ptr);
315 
316         // Increment the reference count. With any kind of reference-counted data structure,
317         // relaxed ordering is appropriate when incrementing the counter.
318         let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
319 
320         // If the reference count overflowed, abort.
321         if state > isize::max_value() as usize {
322             abort();
323         }
324 
325         RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
326     }
327 
328     /// Drops a waker.
329     ///
330     /// This function will decrement the reference count. If it drops down to zero, the associated
331     /// `Task` has been dropped too, and the task has not been completed, then it will get
332     /// scheduled one more time so that its future gets dropped by the executor.
333     #[inline]
drop_waker(ptr: *const ())334     unsafe fn drop_waker(ptr: *const ()) {
335         let raw = Self::from_ptr(ptr);
336 
337         // Decrement the reference count.
338         let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
339 
340         // If this was the last reference to the task and the `Task` has been dropped too,
341         // then we need to decide how to destroy the task.
342         if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
343             if new & (COMPLETED | CLOSED) == 0 {
344                 // If the task was not completed nor closed, close it and schedule one more time so
345                 // that its future gets dropped by the executor.
346                 (*raw.header)
347                     .state
348                     .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
349                 Self::schedule(ptr);
350             } else {
351                 // Otherwise, destroy the task right away.
352                 Self::destroy(ptr);
353             }
354         }
355     }
356 
357     /// Drops a task reference (`Runnable` or `Waker`).
358     ///
359     /// This function will decrement the reference count. If it drops down to zero and the
360     /// associated `Task` handle has been dropped too, then the task gets destroyed.
361     #[inline]
drop_ref(ptr: *const ())362     unsafe fn drop_ref(ptr: *const ()) {
363         let raw = Self::from_ptr(ptr);
364 
365         // Decrement the reference count.
366         let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
367 
368         // If this was the last reference to the task and the `Task` has been dropped too,
369         // then destroy the task.
370         if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
371             Self::destroy(ptr);
372         }
373     }
374 
375     /// Schedules a task for running.
376     ///
377     /// This function doesn't modify the state of the task. It only passes the task reference to
378     /// its schedule function.
schedule(ptr: *const ())379     unsafe fn schedule(ptr: *const ()) {
380         let raw = Self::from_ptr(ptr);
381 
382         // If the schedule function has captured variables, create a temporary waker that prevents
383         // the task from getting deallocated while the function is being invoked.
384         let _waker;
385         if mem::size_of::<S>() > 0 {
386             _waker = Waker::from_raw(Self::clone_waker(ptr));
387         }
388 
389         let task = Runnable {
390             ptr: NonNull::new_unchecked(ptr as *mut ()),
391         };
392         (*raw.schedule)(task);
393     }
394 
395     /// Drops the future inside a task.
396     #[inline]
drop_future(ptr: *const ())397     unsafe fn drop_future(ptr: *const ()) {
398         let raw = Self::from_ptr(ptr);
399 
400         // We need a safeguard against panics because the destructor can panic.
401         abort_on_panic(|| {
402             raw.future.drop_in_place();
403         })
404     }
405 
406     /// Returns a pointer to the output inside a task.
get_output(ptr: *const ()) -> *const ()407     unsafe fn get_output(ptr: *const ()) -> *const () {
408         let raw = Self::from_ptr(ptr);
409         raw.output as *const ()
410     }
411 
412     /// Cleans up task's resources and deallocates it.
413     ///
414     /// The schedule function will be dropped, and the task will then get deallocated.
415     /// The task must be closed before this function is called.
416     #[inline]
destroy(ptr: *const ())417     unsafe fn destroy(ptr: *const ()) {
418         let raw = Self::from_ptr(ptr);
419         let task_layout = Self::task_layout();
420 
421         // We need a safeguard against panics because destructors can panic.
422         abort_on_panic(|| {
423             // Drop the schedule function.
424             (raw.schedule as *mut S).drop_in_place();
425         });
426 
427         // Finally, deallocate the memory reserved by the task.
428         alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
429     }
430 
431     /// Runs a task.
432     ///
433     /// If polling its future panics, the task will be closed and the panic will be propagated into
434     /// the caller.
run(ptr: *const ()) -> bool435     unsafe fn run(ptr: *const ()) -> bool {
436         let raw = Self::from_ptr(ptr);
437 
438         // Create a context from the raw task pointer and the vtable inside the its header.
439         let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
440         let cx = &mut Context::from_waker(&waker);
441 
442         let mut state = (*raw.header).state.load(Ordering::Acquire);
443 
444         // Update the task's state before polling its future.
445         loop {
446             // If the task has already been closed, drop the task reference and return.
447             if state & CLOSED != 0 {
448                 // Drop the future.
449                 Self::drop_future(ptr);
450 
451                 // Mark the task as unscheduled.
452                 let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
453 
454                 // Take the awaiter out.
455                 let mut awaiter = None;
456                 if state & AWAITER != 0 {
457                     awaiter = (*raw.header).take(None);
458                 }
459 
460                 // Drop the task reference.
461                 Self::drop_ref(ptr);
462 
463                 // Notify the awaiter that the future has been dropped.
464                 if let Some(w) = awaiter {
465                     abort_on_panic(|| w.wake());
466                 }
467                 return false;
468             }
469 
470             // Mark the task as unscheduled and running.
471             match (*raw.header).state.compare_exchange_weak(
472                 state,
473                 (state & !SCHEDULED) | RUNNING,
474                 Ordering::AcqRel,
475                 Ordering::Acquire,
476             ) {
477                 Ok(_) => {
478                     // Update the state because we're continuing with polling the future.
479                     state = (state & !SCHEDULED) | RUNNING;
480                     break;
481                 }
482                 Err(s) => state = s,
483             }
484         }
485 
486         // Poll the inner future, but surround it with a guard that closes the task in case polling
487         // panics.
488         let guard = Guard(raw);
489         let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
490         mem::forget(guard);
491 
492         match poll {
493             Poll::Ready(out) => {
494                 // Replace the future with its output.
495                 Self::drop_future(ptr);
496                 raw.output.write(out);
497 
498                 // The task is now completed.
499                 loop {
500                     // If the `Task` is dropped, we'll need to close it and drop the output.
501                     let new = if state & TASK == 0 {
502                         (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
503                     } else {
504                         (state & !RUNNING & !SCHEDULED) | COMPLETED
505                     };
506 
507                     // Mark the task as not running and completed.
508                     match (*raw.header).state.compare_exchange_weak(
509                         state,
510                         new,
511                         Ordering::AcqRel,
512                         Ordering::Acquire,
513                     ) {
514                         Ok(_) => {
515                             // If the `Task` is dropped or if the task was closed while running,
516                             // now it's time to drop the output.
517                             if state & TASK == 0 || state & CLOSED != 0 {
518                                 // Drop the output.
519                                 abort_on_panic(|| raw.output.drop_in_place());
520                             }
521 
522                             // Take the awaiter out.
523                             let mut awaiter = None;
524                             if state & AWAITER != 0 {
525                                 awaiter = (*raw.header).take(None);
526                             }
527 
528                             // Drop the task reference.
529                             Self::drop_ref(ptr);
530 
531                             // Notify the awaiter that the future has been dropped.
532                             if let Some(w) = awaiter {
533                                 abort_on_panic(|| w.wake());
534                             }
535                             break;
536                         }
537                         Err(s) => state = s,
538                     }
539                 }
540             }
541             Poll::Pending => {
542                 let mut future_dropped = false;
543 
544                 // The task is still not completed.
545                 loop {
546                     // If the task was closed while running, we'll need to unschedule in case it
547                     // was woken up and then destroy it.
548                     let new = if state & CLOSED != 0 {
549                         state & !RUNNING & !SCHEDULED
550                     } else {
551                         state & !RUNNING
552                     };
553 
554                     if state & CLOSED != 0 && !future_dropped {
555                         // The thread that closed the task didn't drop the future because it was
556                         // running so now it's our responsibility to do so.
557                         Self::drop_future(ptr);
558                         future_dropped = true;
559                     }
560 
561                     // Mark the task as not running.
562                     match (*raw.header).state.compare_exchange_weak(
563                         state,
564                         new,
565                         Ordering::AcqRel,
566                         Ordering::Acquire,
567                     ) {
568                         Ok(state) => {
569                             // If the task was closed while running, we need to notify the awaiter.
570                             // If the task was woken up while running, we need to schedule it.
571                             // Otherwise, we just drop the task reference.
572                             if state & CLOSED != 0 {
573                                 // Take the awaiter out.
574                                 let mut awaiter = None;
575                                 if state & AWAITER != 0 {
576                                     awaiter = (*raw.header).take(None);
577                                 }
578 
579                                 // Drop the task reference.
580                                 Self::drop_ref(ptr);
581 
582                                 // Notify the awaiter that the future has been dropped.
583                                 if let Some(w) = awaiter {
584                                     abort_on_panic(|| w.wake());
585                                 }
586                             } else if state & SCHEDULED != 0 {
587                                 // The thread that woke the task up didn't reschedule it because
588                                 // it was running so now it's our responsibility to do so.
589                                 Self::schedule(ptr);
590                                 return true;
591                             } else {
592                                 // Drop the task reference.
593                                 Self::drop_ref(ptr);
594                             }
595                             break;
596                         }
597                         Err(s) => state = s,
598                     }
599                 }
600             }
601         }
602 
603         return false;
604 
605         /// A guard that closes the task if polling its future panics.
606         struct Guard<F, T, S>(RawTask<F, T, S>)
607         where
608             F: Future<Output = T>,
609             S: Fn(Runnable);
610 
611         impl<F, T, S> Drop for Guard<F, T, S>
612         where
613             F: Future<Output = T>,
614             S: Fn(Runnable),
615         {
616             fn drop(&mut self) {
617                 let raw = self.0;
618                 let ptr = raw.header as *const ();
619 
620                 unsafe {
621                     let mut state = (*raw.header).state.load(Ordering::Acquire);
622 
623                     loop {
624                         // If the task was closed while running, then unschedule it, drop its
625                         // future, and drop the task reference.
626                         if state & CLOSED != 0 {
627                             // The thread that closed the task didn't drop the future because it
628                             // was running so now it's our responsibility to do so.
629                             RawTask::<F, T, S>::drop_future(ptr);
630 
631                             // Mark the task as not running and not scheduled.
632                             (*raw.header)
633                                 .state
634                                 .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
635 
636                             // Take the awaiter out.
637                             let mut awaiter = None;
638                             if state & AWAITER != 0 {
639                                 awaiter = (*raw.header).take(None);
640                             }
641 
642                             // Drop the task reference.
643                             RawTask::<F, T, S>::drop_ref(ptr);
644 
645                             // Notify the awaiter that the future has been dropped.
646                             if let Some(w) = awaiter {
647                                 abort_on_panic(|| w.wake());
648                             }
649                             break;
650                         }
651 
652                         // Mark the task as not running, not scheduled, and closed.
653                         match (*raw.header).state.compare_exchange_weak(
654                             state,
655                             (state & !RUNNING & !SCHEDULED) | CLOSED,
656                             Ordering::AcqRel,
657                             Ordering::Acquire,
658                         ) {
659                             Ok(state) => {
660                                 // Drop the future because the task is now closed.
661                                 RawTask::<F, T, S>::drop_future(ptr);
662 
663                                 // Take the awaiter out.
664                                 let mut awaiter = None;
665                                 if state & AWAITER != 0 {
666                                     awaiter = (*raw.header).take(None);
667                                 }
668 
669                                 // Drop the task reference.
670                                 RawTask::<F, T, S>::drop_ref(ptr);
671 
672                                 // Notify the awaiter that the future has been dropped.
673                                 if let Some(w) = awaiter {
674                                     abort_on_panic(|| w.wake());
675                                 }
676                                 break;
677                             }
678                             Err(s) => state = s,
679                         }
680                     }
681                 }
682             }
683         }
684     }
685 }
686