• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use alloc::alloc::Layout as StdLayout;
2 use core::cell::UnsafeCell;
3 use core::future::Future;
4 use core::marker::PhantomData;
5 use core::mem::{self, ManuallyDrop};
6 use core::pin::Pin;
7 use core::ptr::NonNull;
8 use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
9 
10 #[cfg(not(feature = "portable-atomic"))]
11 use core::sync::atomic::AtomicUsize;
12 use core::sync::atomic::Ordering;
13 #[cfg(feature = "portable-atomic")]
14 use portable_atomic::AtomicUsize;
15 
16 use crate::header::Header;
17 use crate::runnable::{Schedule, ScheduleInfo};
18 use crate::state::*;
19 use crate::utils::{abort, abort_on_panic, max, Layout};
20 use crate::Runnable;
21 
22 #[cfg(feature = "std")]
23 pub(crate) type Panic = alloc::boxed::Box<dyn core::any::Any + Send + 'static>;
24 
25 #[cfg(not(feature = "std"))]
26 pub(crate) type Panic = core::convert::Infallible;
27 
28 /// The vtable for a task.
29 pub(crate) struct TaskVTable {
30     /// Schedules the task.
31     pub(crate) schedule: unsafe fn(*const (), ScheduleInfo),
32 
33     /// Drops the future inside the task.
34     pub(crate) drop_future: unsafe fn(*const ()),
35 
36     /// Returns a pointer to the output stored after completion.
37     pub(crate) get_output: unsafe fn(*const ()) -> *const (),
38 
39     /// Drops the task reference (`Runnable` or `Waker`).
40     pub(crate) drop_ref: unsafe fn(ptr: *const ()),
41 
42     /// Destroys the task.
43     pub(crate) destroy: unsafe fn(*const ()),
44 
45     /// Runs the task.
46     pub(crate) run: unsafe fn(*const ()) -> bool,
47 
48     /// Creates a new waker associated with the task.
49     pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker,
50 
51     /// The memory layout of the task. This information enables
52     /// debuggers to decode raw task memory blobs. Do not remove
53     /// the field, even if it appears to be unused.
54     #[allow(unused)]
55     pub(crate) layout_info: &'static Option<TaskLayout>,
56 }
57 
58 /// Memory layout of a task.
59 ///
60 /// This struct contains the following information:
61 ///
62 /// 1. How to allocate and deallocate the task.
63 /// 2. How to access the fields inside the task.
64 #[derive(Clone, Copy)]
65 pub(crate) struct TaskLayout {
66     /// Memory layout of the whole task.
67     pub(crate) layout: StdLayout,
68 
69     /// Offset into the task at which the schedule function is stored.
70     pub(crate) offset_s: usize,
71 
72     /// Offset into the task at which the future is stored.
73     pub(crate) offset_f: usize,
74 
75     /// Offset into the task at which the output is stored.
76     pub(crate) offset_r: usize,
77 }
78 
79 /// Raw pointers to the fields inside a task.
80 pub(crate) struct RawTask<F, T, S, M> {
81     /// The task header.
82     pub(crate) header: *const Header<M>,
83 
84     /// The schedule function.
85     pub(crate) schedule: *const S,
86 
87     /// The future.
88     pub(crate) future: *mut F,
89 
90     /// The output of the future.
91     pub(crate) output: *mut Result<T, Panic>,
92 }
93 
94 impl<F, T, S, M> Copy for RawTask<F, T, S, M> {}
95 
96 impl<F, T, S, M> Clone for RawTask<F, T, S, M> {
clone(&self) -> Self97     fn clone(&self) -> Self {
98         *self
99     }
100 }
101 
102 impl<F, T, S, M> RawTask<F, T, S, M> {
103     const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout();
104 
105     /// Computes the memory layout for a task.
106     #[inline]
eval_task_layout() -> Option<TaskLayout>107     const fn eval_task_layout() -> Option<TaskLayout> {
108         // Compute the layouts for `Header`, `S`, `F`, and `T`.
109         let layout_header = Layout::new::<Header<M>>();
110         let layout_s = Layout::new::<S>();
111         let layout_f = Layout::new::<F>();
112         let layout_r = Layout::new::<Result<T, Panic>>();
113 
114         // Compute the layout for `union { F, T }`.
115         let size_union = max(layout_f.size(), layout_r.size());
116         let align_union = max(layout_f.align(), layout_r.align());
117         let layout_union = Layout::from_size_align(size_union, align_union);
118 
119         // Compute the layout for `Header` followed `S` and `union { F, T }`.
120         let layout = layout_header;
121         let (layout, offset_s) = leap!(layout.extend(layout_s));
122         let (layout, offset_union) = leap!(layout.extend(layout_union));
123         let offset_f = offset_union;
124         let offset_r = offset_union;
125 
126         Some(TaskLayout {
127             layout: unsafe { layout.into_std() },
128             offset_s,
129             offset_f,
130             offset_r,
131         })
132     }
133 }
134 
135 impl<F, T, S, M> RawTask<F, T, S, M>
136 where
137     F: Future<Output = T>,
138     S: Schedule<M>,
139 {
140     const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
141         Self::clone_waker,
142         Self::wake,
143         Self::wake_by_ref,
144         Self::drop_waker,
145     );
146 
147     /// Allocates a task with the given `future` and `schedule` function.
148     ///
149     /// It is assumed that initially only the `Runnable` and the `Task` exist.
allocate<'a, Gen: FnOnce(&'a M) -> F>( future: Gen, schedule: S, builder: crate::Builder<M>, ) -> NonNull<()> where F: 'a, M: 'a,150     pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>(
151         future: Gen,
152         schedule: S,
153         builder: crate::Builder<M>,
154     ) -> NonNull<()>
155     where
156         F: 'a,
157         M: 'a,
158     {
159         // Compute the layout of the task for allocation. Abort if the computation fails.
160         //
161         // n.b. notgull: task_layout now automatically aborts instead of panicking
162         let task_layout = Self::task_layout();
163 
164         unsafe {
165             // Allocate enough space for the entire task.
166             let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
167                 None => abort(),
168                 Some(p) => p,
169             };
170 
171             let raw = Self::from_ptr(ptr.as_ptr());
172 
173             let crate::Builder {
174                 metadata,
175                 #[cfg(feature = "std")]
176                 propagate_panic,
177             } = builder;
178 
179             // Write the header as the first field of the task.
180             (raw.header as *mut Header<M>).write(Header {
181                 state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
182                 awaiter: UnsafeCell::new(None),
183                 vtable: &TaskVTable {
184                     schedule: Self::schedule,
185                     drop_future: Self::drop_future,
186                     get_output: Self::get_output,
187                     drop_ref: Self::drop_ref,
188                     destroy: Self::destroy,
189                     run: Self::run,
190                     clone_waker: Self::clone_waker,
191                     layout_info: &Self::TASK_LAYOUT,
192                 },
193                 metadata,
194                 #[cfg(feature = "std")]
195                 propagate_panic,
196             });
197 
198             // Write the schedule function as the third field of the task.
199             (raw.schedule as *mut S).write(schedule);
200 
201             // Generate the future, now that the metadata has been pinned in place.
202             let future = abort_on_panic(|| future(&(*raw.header).metadata));
203 
204             // Write the future as the fourth field of the task.
205             raw.future.write(future);
206 
207             ptr
208         }
209     }
210 
211     /// Creates a `RawTask` from a raw task pointer.
212     #[inline]
from_ptr(ptr: *const ()) -> Self213     pub(crate) fn from_ptr(ptr: *const ()) -> Self {
214         let task_layout = Self::task_layout();
215         let p = ptr as *const u8;
216 
217         unsafe {
218             Self {
219                 header: p as *const Header<M>,
220                 schedule: p.add(task_layout.offset_s) as *const S,
221                 future: p.add(task_layout.offset_f) as *mut F,
222                 output: p.add(task_layout.offset_r) as *mut Result<T, Panic>,
223             }
224         }
225     }
226 
227     /// Returns the layout of the task.
228     #[inline]
task_layout() -> TaskLayout229     fn task_layout() -> TaskLayout {
230         match Self::TASK_LAYOUT {
231             Some(tl) => tl,
232             None => abort(),
233         }
234     }
235 
236     /// Wakes a waker.
wake(ptr: *const ())237     unsafe fn wake(ptr: *const ()) {
238         // This is just an optimization. If the schedule function has captured variables, then
239         // we'll do less reference counting if we wake the waker by reference and then drop it.
240         if mem::size_of::<S>() > 0 {
241             Self::wake_by_ref(ptr);
242             Self::drop_waker(ptr);
243             return;
244         }
245 
246         let raw = Self::from_ptr(ptr);
247 
248         let mut state = (*raw.header).state.load(Ordering::Acquire);
249 
250         loop {
251             // If the task is completed or closed, it can't be woken up.
252             if state & (COMPLETED | CLOSED) != 0 {
253                 // Drop the waker.
254                 Self::drop_waker(ptr);
255                 break;
256             }
257 
258             // If the task is already scheduled, we just need to synchronize with the thread that
259             // will run the task by "publishing" our current view of the memory.
260             if state & SCHEDULED != 0 {
261                 // Update the state without actually modifying it.
262                 match (*raw.header).state.compare_exchange_weak(
263                     state,
264                     state,
265                     Ordering::AcqRel,
266                     Ordering::Acquire,
267                 ) {
268                     Ok(_) => {
269                         // Drop the waker.
270                         Self::drop_waker(ptr);
271                         break;
272                     }
273                     Err(s) => state = s,
274                 }
275             } else {
276                 // Mark the task as scheduled.
277                 match (*raw.header).state.compare_exchange_weak(
278                     state,
279                     state | SCHEDULED,
280                     Ordering::AcqRel,
281                     Ordering::Acquire,
282                 ) {
283                     Ok(_) => {
284                         // If the task is not yet scheduled and isn't currently running, now is the
285                         // time to schedule it.
286                         if state & RUNNING == 0 {
287                             // Schedule the task.
288                             Self::schedule(ptr, ScheduleInfo::new(false));
289                         } else {
290                             // Drop the waker.
291                             Self::drop_waker(ptr);
292                         }
293 
294                         break;
295                     }
296                     Err(s) => state = s,
297                 }
298             }
299         }
300     }
301 
302     /// Wakes a waker by reference.
wake_by_ref(ptr: *const ())303     unsafe fn wake_by_ref(ptr: *const ()) {
304         let raw = Self::from_ptr(ptr);
305 
306         let mut state = (*raw.header).state.load(Ordering::Acquire);
307 
308         loop {
309             // If the task is completed or closed, it can't be woken up.
310             if state & (COMPLETED | CLOSED) != 0 {
311                 break;
312             }
313 
314             // If the task is already scheduled, we just need to synchronize with the thread that
315             // will run the task by "publishing" our current view of the memory.
316             if state & SCHEDULED != 0 {
317                 // Update the state without actually modifying it.
318                 match (*raw.header).state.compare_exchange_weak(
319                     state,
320                     state,
321                     Ordering::AcqRel,
322                     Ordering::Acquire,
323                 ) {
324                     Ok(_) => break,
325                     Err(s) => state = s,
326                 }
327             } else {
328                 // If the task is not running, we can schedule right away.
329                 let new = if state & RUNNING == 0 {
330                     (state | SCHEDULED) + REFERENCE
331                 } else {
332                     state | SCHEDULED
333                 };
334 
335                 // Mark the task as scheduled.
336                 match (*raw.header).state.compare_exchange_weak(
337                     state,
338                     new,
339                     Ordering::AcqRel,
340                     Ordering::Acquire,
341                 ) {
342                     Ok(_) => {
343                         // If the task is not running, now is the time to schedule.
344                         if state & RUNNING == 0 {
345                             // If the reference count overflowed, abort.
346                             if state > isize::MAX as usize {
347                                 abort();
348                             }
349 
350                             // Schedule the task. There is no need to call `Self::schedule(ptr)`
351                             // because the schedule function cannot be destroyed while the waker is
352                             // still alive.
353                             let task = Runnable {
354                                 ptr: NonNull::new_unchecked(ptr as *mut ()),
355                                 _marker: PhantomData,
356                             };
357                             (*raw.schedule).schedule(task, ScheduleInfo::new(false));
358                         }
359 
360                         break;
361                     }
362                     Err(s) => state = s,
363                 }
364             }
365         }
366     }
367 
368     /// Clones a waker.
clone_waker(ptr: *const ()) -> RawWaker369     unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
370         let raw = Self::from_ptr(ptr);
371 
372         // Increment the reference count. With any kind of reference-counted data structure,
373         // relaxed ordering is appropriate when incrementing the counter.
374         let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
375 
376         // If the reference count overflowed, abort.
377         if state > isize::MAX as usize {
378             abort();
379         }
380 
381         RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)
382     }
383 
384     /// Drops a waker.
385     ///
386     /// This function will decrement the reference count. If it drops down to zero, the associated
387     /// `Task` has been dropped too, and the task has not been completed, then it will get
388     /// scheduled one more time so that its future gets dropped by the executor.
389     #[inline]
drop_waker(ptr: *const ())390     unsafe fn drop_waker(ptr: *const ()) {
391         let raw = Self::from_ptr(ptr);
392 
393         // Decrement the reference count.
394         let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
395 
396         // If this was the last reference to the task and the `Task` has been dropped too,
397         // then we need to decide how to destroy the task.
398         if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
399             if new & (COMPLETED | CLOSED) == 0 {
400                 // If the task was not completed nor closed, close it and schedule one more time so
401                 // that its future gets dropped by the executor.
402                 (*raw.header)
403                     .state
404                     .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release);
405                 Self::schedule(ptr, ScheduleInfo::new(false));
406             } else {
407                 // Otherwise, destroy the task right away.
408                 Self::destroy(ptr);
409             }
410         }
411     }
412 
413     /// Drops a task reference (`Runnable` or `Waker`).
414     ///
415     /// This function will decrement the reference count. If it drops down to zero and the
416     /// associated `Task` handle has been dropped too, then the task gets destroyed.
417     #[inline]
drop_ref(ptr: *const ())418     unsafe fn drop_ref(ptr: *const ()) {
419         let raw = Self::from_ptr(ptr);
420 
421         // Decrement the reference count.
422         let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
423 
424         // If this was the last reference to the task and the `Task` has been dropped too,
425         // then destroy the task.
426         if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
427             Self::destroy(ptr);
428         }
429     }
430 
431     /// Schedules a task for running.
432     ///
433     /// This function doesn't modify the state of the task. It only passes the task reference to
434     /// its schedule function.
schedule(ptr: *const (), info: ScheduleInfo)435     unsafe fn schedule(ptr: *const (), info: ScheduleInfo) {
436         let raw = Self::from_ptr(ptr);
437 
438         // If the schedule function has captured variables, create a temporary waker that prevents
439         // the task from getting deallocated while the function is being invoked.
440         let _waker;
441         if mem::size_of::<S>() > 0 {
442             _waker = Waker::from_raw(Self::clone_waker(ptr));
443         }
444 
445         let task = Runnable {
446             ptr: NonNull::new_unchecked(ptr as *mut ()),
447             _marker: PhantomData,
448         };
449         (*raw.schedule).schedule(task, info);
450     }
451 
452     /// Drops the future inside a task.
453     #[inline]
drop_future(ptr: *const ())454     unsafe fn drop_future(ptr: *const ()) {
455         let raw = Self::from_ptr(ptr);
456 
457         // We need a safeguard against panics because the destructor can panic.
458         abort_on_panic(|| {
459             raw.future.drop_in_place();
460         })
461     }
462 
463     /// Returns a pointer to the output inside a task.
get_output(ptr: *const ()) -> *const ()464     unsafe fn get_output(ptr: *const ()) -> *const () {
465         let raw = Self::from_ptr(ptr);
466         raw.output as *const ()
467     }
468 
469     /// Cleans up task's resources and deallocates it.
470     ///
471     /// The schedule function will be dropped, and the task will then get deallocated.
472     /// The task must be closed before this function is called.
473     #[inline]
destroy(ptr: *const ())474     unsafe fn destroy(ptr: *const ()) {
475         let raw = Self::from_ptr(ptr);
476         let task_layout = Self::task_layout();
477 
478         // We need a safeguard against panics because destructors can panic.
479         abort_on_panic(|| {
480             // Drop the header along with the metadata.
481             (raw.header as *mut Header<M>).drop_in_place();
482 
483             // Drop the schedule function.
484             (raw.schedule as *mut S).drop_in_place();
485         });
486 
487         // Finally, deallocate the memory reserved by the task.
488         alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout);
489     }
490 
491     /// Runs a task.
492     ///
493     /// If polling its future panics, the task will be closed and the panic will be propagated into
494     /// the caller.
run(ptr: *const ()) -> bool495     unsafe fn run(ptr: *const ()) -> bool {
496         let raw = Self::from_ptr(ptr);
497 
498         // Create a context from the raw task pointer and the vtable inside the its header.
499         let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE)));
500         let cx = &mut Context::from_waker(&waker);
501 
502         let mut state = (*raw.header).state.load(Ordering::Acquire);
503 
504         // Update the task's state before polling its future.
505         loop {
506             // If the task has already been closed, drop the task reference and return.
507             if state & CLOSED != 0 {
508                 // Drop the future.
509                 Self::drop_future(ptr);
510 
511                 // Mark the task as unscheduled.
512                 let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
513 
514                 // Take the awaiter out.
515                 let mut awaiter = None;
516                 if state & AWAITER != 0 {
517                     awaiter = (*raw.header).take(None);
518                 }
519 
520                 // Drop the task reference.
521                 Self::drop_ref(ptr);
522 
523                 // Notify the awaiter that the future has been dropped.
524                 if let Some(w) = awaiter {
525                     abort_on_panic(|| w.wake());
526                 }
527                 return false;
528             }
529 
530             // Mark the task as unscheduled and running.
531             match (*raw.header).state.compare_exchange_weak(
532                 state,
533                 (state & !SCHEDULED) | RUNNING,
534                 Ordering::AcqRel,
535                 Ordering::Acquire,
536             ) {
537                 Ok(_) => {
538                     // Update the state because we're continuing with polling the future.
539                     state = (state & !SCHEDULED) | RUNNING;
540                     break;
541                 }
542                 Err(s) => state = s,
543             }
544         }
545 
546         // Poll the inner future, but surround it with a guard that closes the task in case polling
547         // panics.
548         // If available, we should also try to catch the panic so that it is propagated correctly.
549         let guard = Guard(raw);
550 
551         // Panic propagation is not available for no_std.
552         #[cfg(not(feature = "std"))]
553         let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok);
554 
555         #[cfg(feature = "std")]
556         let poll = {
557             // Check if we should propagate panics.
558             if (*raw.header).propagate_panic {
559                 // Use catch_unwind to catch the panic.
560                 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
561                     <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx)
562                 })) {
563                     Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
564                     Ok(Poll::Pending) => Poll::Pending,
565                     Err(e) => Poll::Ready(Err(e)),
566                 }
567             } else {
568                 <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok)
569             }
570         };
571 
572         mem::forget(guard);
573 
574         match poll {
575             Poll::Ready(out) => {
576                 // Replace the future with its output.
577                 Self::drop_future(ptr);
578                 raw.output.write(out);
579 
580                 // The task is now completed.
581                 loop {
582                     // If the `Task` is dropped, we'll need to close it and drop the output.
583                     let new = if state & TASK == 0 {
584                         (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
585                     } else {
586                         (state & !RUNNING & !SCHEDULED) | COMPLETED
587                     };
588 
589                     // Mark the task as not running and completed.
590                     match (*raw.header).state.compare_exchange_weak(
591                         state,
592                         new,
593                         Ordering::AcqRel,
594                         Ordering::Acquire,
595                     ) {
596                         Ok(_) => {
597                             // If the `Task` is dropped or if the task was closed while running,
598                             // now it's time to drop the output.
599                             if state & TASK == 0 || state & CLOSED != 0 {
600                                 // Drop the output.
601                                 abort_on_panic(|| raw.output.drop_in_place());
602                             }
603 
604                             // Take the awaiter out.
605                             let mut awaiter = None;
606                             if state & AWAITER != 0 {
607                                 awaiter = (*raw.header).take(None);
608                             }
609 
610                             // Drop the task reference.
611                             Self::drop_ref(ptr);
612 
613                             // Notify the awaiter that the future has been dropped.
614                             if let Some(w) = awaiter {
615                                 abort_on_panic(|| w.wake());
616                             }
617                             break;
618                         }
619                         Err(s) => state = s,
620                     }
621                 }
622             }
623             Poll::Pending => {
624                 let mut future_dropped = false;
625 
626                 // The task is still not completed.
627                 loop {
628                     // If the task was closed while running, we'll need to unschedule in case it
629                     // was woken up and then destroy it.
630                     let new = if state & CLOSED != 0 {
631                         state & !RUNNING & !SCHEDULED
632                     } else {
633                         state & !RUNNING
634                     };
635 
636                     if state & CLOSED != 0 && !future_dropped {
637                         // The thread that closed the task didn't drop the future because it was
638                         // running so now it's our responsibility to do so.
639                         Self::drop_future(ptr);
640                         future_dropped = true;
641                     }
642 
643                     // Mark the task as not running.
644                     match (*raw.header).state.compare_exchange_weak(
645                         state,
646                         new,
647                         Ordering::AcqRel,
648                         Ordering::Acquire,
649                     ) {
650                         Ok(state) => {
651                             // If the task was closed while running, we need to notify the awaiter.
652                             // If the task was woken up while running, we need to schedule it.
653                             // Otherwise, we just drop the task reference.
654                             if state & CLOSED != 0 {
655                                 // Take the awaiter out.
656                                 let mut awaiter = None;
657                                 if state & AWAITER != 0 {
658                                     awaiter = (*raw.header).take(None);
659                                 }
660 
661                                 // Drop the task reference.
662                                 Self::drop_ref(ptr);
663 
664                                 // Notify the awaiter that the future has been dropped.
665                                 if let Some(w) = awaiter {
666                                     abort_on_panic(|| w.wake());
667                                 }
668                             } else if state & SCHEDULED != 0 {
669                                 // The thread that woke the task up didn't reschedule it because
670                                 // it was running so now it's our responsibility to do so.
671                                 Self::schedule(ptr, ScheduleInfo::new(true));
672                                 return true;
673                             } else {
674                                 // Drop the task reference.
675                                 Self::drop_ref(ptr);
676                             }
677                             break;
678                         }
679                         Err(s) => state = s,
680                     }
681                 }
682             }
683         }
684 
685         return false;
686 
687         /// A guard that closes the task if polling its future panics.
688         struct Guard<F, T, S, M>(RawTask<F, T, S, M>)
689         where
690             F: Future<Output = T>,
691             S: Schedule<M>;
692 
693         impl<F, T, S, M> Drop for Guard<F, T, S, M>
694         where
695             F: Future<Output = T>,
696             S: Schedule<M>,
697         {
698             fn drop(&mut self) {
699                 let raw = self.0;
700                 let ptr = raw.header as *const ();
701 
702                 unsafe {
703                     let mut state = (*raw.header).state.load(Ordering::Acquire);
704 
705                     loop {
706                         // If the task was closed while running, then unschedule it, drop its
707                         // future, and drop the task reference.
708                         if state & CLOSED != 0 {
709                             // The thread that closed the task didn't drop the future because it
710                             // was running so now it's our responsibility to do so.
711                             RawTask::<F, T, S, M>::drop_future(ptr);
712 
713                             // Mark the task as not running and not scheduled.
714                             (*raw.header)
715                                 .state
716                                 .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
717 
718                             // Take the awaiter out.
719                             let mut awaiter = None;
720                             if state & AWAITER != 0 {
721                                 awaiter = (*raw.header).take(None);
722                             }
723 
724                             // Drop the task reference.
725                             RawTask::<F, T, S, M>::drop_ref(ptr);
726 
727                             // Notify the awaiter that the future has been dropped.
728                             if let Some(w) = awaiter {
729                                 abort_on_panic(|| w.wake());
730                             }
731                             break;
732                         }
733 
734                         // Mark the task as not running, not scheduled, and closed.
735                         match (*raw.header).state.compare_exchange_weak(
736                             state,
737                             (state & !RUNNING & !SCHEDULED) | CLOSED,
738                             Ordering::AcqRel,
739                             Ordering::Acquire,
740                         ) {
741                             Ok(state) => {
742                                 // Drop the future because the task is now closed.
743                                 RawTask::<F, T, S, M>::drop_future(ptr);
744 
745                                 // Take the awaiter out.
746                                 let mut awaiter = None;
747                                 if state & AWAITER != 0 {
748                                     awaiter = (*raw.header).take(None);
749                                 }
750 
751                                 // Drop the task reference.
752                                 RawTask::<F, T, S, M>::drop_ref(ptr);
753 
754                                 // Notify the awaiter that the future has been dropped.
755                                 if let Some(w) = awaiter {
756                                     abort_on_panic(|| w.wake());
757                                 }
758                                 break;
759                             }
760                             Err(s) => state = s,
761                         }
762                     }
763                 }
764             }
765         }
766     }
767 }
768