1 use alloc::alloc::Layout as StdLayout; 2 use core::cell::UnsafeCell; 3 use core::future::Future; 4 use core::marker::PhantomData; 5 use core::mem::{self, ManuallyDrop}; 6 use core::pin::Pin; 7 use core::ptr::NonNull; 8 use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; 9 10 #[cfg(not(feature = "portable-atomic"))] 11 use core::sync::atomic::AtomicUsize; 12 use core::sync::atomic::Ordering; 13 #[cfg(feature = "portable-atomic")] 14 use portable_atomic::AtomicUsize; 15 16 use crate::header::Header; 17 use crate::runnable::{Schedule, ScheduleInfo}; 18 use crate::state::*; 19 use crate::utils::{abort, abort_on_panic, max, Layout}; 20 use crate::Runnable; 21 22 #[cfg(feature = "std")] 23 pub(crate) type Panic = alloc::boxed::Box<dyn core::any::Any + Send + 'static>; 24 25 #[cfg(not(feature = "std"))] 26 pub(crate) type Panic = core::convert::Infallible; 27 28 /// The vtable for a task. 29 pub(crate) struct TaskVTable { 30 /// Schedules the task. 31 pub(crate) schedule: unsafe fn(*const (), ScheduleInfo), 32 33 /// Drops the future inside the task. 34 pub(crate) drop_future: unsafe fn(*const ()), 35 36 /// Returns a pointer to the output stored after completion. 37 pub(crate) get_output: unsafe fn(*const ()) -> *const (), 38 39 /// Drops the task reference (`Runnable` or `Waker`). 40 pub(crate) drop_ref: unsafe fn(ptr: *const ()), 41 42 /// Destroys the task. 43 pub(crate) destroy: unsafe fn(*const ()), 44 45 /// Runs the task. 46 pub(crate) run: unsafe fn(*const ()) -> bool, 47 48 /// Creates a new waker associated with the task. 49 pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker, 50 51 /// The memory layout of the task. This information enables 52 /// debuggers to decode raw task memory blobs. Do not remove 53 /// the field, even if it appears to be unused. 54 #[allow(unused)] 55 pub(crate) layout_info: &'static Option<TaskLayout>, 56 } 57 58 /// Memory layout of a task. 59 /// 60 /// This struct contains the following information: 61 /// 62 /// 1. How to allocate and deallocate the task. 63 /// 2. How to access the fields inside the task. 64 #[derive(Clone, Copy)] 65 pub(crate) struct TaskLayout { 66 /// Memory layout of the whole task. 67 pub(crate) layout: StdLayout, 68 69 /// Offset into the task at which the schedule function is stored. 70 pub(crate) offset_s: usize, 71 72 /// Offset into the task at which the future is stored. 73 pub(crate) offset_f: usize, 74 75 /// Offset into the task at which the output is stored. 76 pub(crate) offset_r: usize, 77 } 78 79 /// Raw pointers to the fields inside a task. 80 pub(crate) struct RawTask<F, T, S, M> { 81 /// The task header. 82 pub(crate) header: *const Header<M>, 83 84 /// The schedule function. 85 pub(crate) schedule: *const S, 86 87 /// The future. 88 pub(crate) future: *mut F, 89 90 /// The output of the future. 91 pub(crate) output: *mut Result<T, Panic>, 92 } 93 94 impl<F, T, S, M> Copy for RawTask<F, T, S, M> {} 95 96 impl<F, T, S, M> Clone for RawTask<F, T, S, M> { clone(&self) -> Self97 fn clone(&self) -> Self { 98 *self 99 } 100 } 101 102 impl<F, T, S, M> RawTask<F, T, S, M> { 103 const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout(); 104 105 /// Computes the memory layout for a task. 106 #[inline] eval_task_layout() -> Option<TaskLayout>107 const fn eval_task_layout() -> Option<TaskLayout> { 108 // Compute the layouts for `Header`, `S`, `F`, and `T`. 109 let layout_header = Layout::new::<Header<M>>(); 110 let layout_s = Layout::new::<S>(); 111 let layout_f = Layout::new::<F>(); 112 let layout_r = Layout::new::<Result<T, Panic>>(); 113 114 // Compute the layout for `union { F, T }`. 115 let size_union = max(layout_f.size(), layout_r.size()); 116 let align_union = max(layout_f.align(), layout_r.align()); 117 let layout_union = Layout::from_size_align(size_union, align_union); 118 119 // Compute the layout for `Header` followed `S` and `union { F, T }`. 120 let layout = layout_header; 121 let (layout, offset_s) = leap!(layout.extend(layout_s)); 122 let (layout, offset_union) = leap!(layout.extend(layout_union)); 123 let offset_f = offset_union; 124 let offset_r = offset_union; 125 126 Some(TaskLayout { 127 layout: unsafe { layout.into_std() }, 128 offset_s, 129 offset_f, 130 offset_r, 131 }) 132 } 133 } 134 135 impl<F, T, S, M> RawTask<F, T, S, M> 136 where 137 F: Future<Output = T>, 138 S: Schedule<M>, 139 { 140 const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( 141 Self::clone_waker, 142 Self::wake, 143 Self::wake_by_ref, 144 Self::drop_waker, 145 ); 146 147 /// Allocates a task with the given `future` and `schedule` function. 148 /// 149 /// It is assumed that initially only the `Runnable` and the `Task` exist. allocate<'a, Gen: FnOnce(&'a M) -> F>( future: Gen, schedule: S, builder: crate::Builder<M>, ) -> NonNull<()> where F: 'a, M: 'a,150 pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>( 151 future: Gen, 152 schedule: S, 153 builder: crate::Builder<M>, 154 ) -> NonNull<()> 155 where 156 F: 'a, 157 M: 'a, 158 { 159 // Compute the layout of the task for allocation. Abort if the computation fails. 160 // 161 // n.b. notgull: task_layout now automatically aborts instead of panicking 162 let task_layout = Self::task_layout(); 163 164 unsafe { 165 // Allocate enough space for the entire task. 166 let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) { 167 None => abort(), 168 Some(p) => p, 169 }; 170 171 let raw = Self::from_ptr(ptr.as_ptr()); 172 173 let crate::Builder { 174 metadata, 175 #[cfg(feature = "std")] 176 propagate_panic, 177 } = builder; 178 179 // Write the header as the first field of the task. 180 (raw.header as *mut Header<M>).write(Header { 181 state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), 182 awaiter: UnsafeCell::new(None), 183 vtable: &TaskVTable { 184 schedule: Self::schedule, 185 drop_future: Self::drop_future, 186 get_output: Self::get_output, 187 drop_ref: Self::drop_ref, 188 destroy: Self::destroy, 189 run: Self::run, 190 clone_waker: Self::clone_waker, 191 layout_info: &Self::TASK_LAYOUT, 192 }, 193 metadata, 194 #[cfg(feature = "std")] 195 propagate_panic, 196 }); 197 198 // Write the schedule function as the third field of the task. 199 (raw.schedule as *mut S).write(schedule); 200 201 // Generate the future, now that the metadata has been pinned in place. 202 let future = abort_on_panic(|| future(&(*raw.header).metadata)); 203 204 // Write the future as the fourth field of the task. 205 raw.future.write(future); 206 207 ptr 208 } 209 } 210 211 /// Creates a `RawTask` from a raw task pointer. 212 #[inline] from_ptr(ptr: *const ()) -> Self213 pub(crate) fn from_ptr(ptr: *const ()) -> Self { 214 let task_layout = Self::task_layout(); 215 let p = ptr as *const u8; 216 217 unsafe { 218 Self { 219 header: p as *const Header<M>, 220 schedule: p.add(task_layout.offset_s) as *const S, 221 future: p.add(task_layout.offset_f) as *mut F, 222 output: p.add(task_layout.offset_r) as *mut Result<T, Panic>, 223 } 224 } 225 } 226 227 /// Returns the layout of the task. 228 #[inline] task_layout() -> TaskLayout229 fn task_layout() -> TaskLayout { 230 match Self::TASK_LAYOUT { 231 Some(tl) => tl, 232 None => abort(), 233 } 234 } 235 236 /// Wakes a waker. wake(ptr: *const ())237 unsafe fn wake(ptr: *const ()) { 238 // This is just an optimization. If the schedule function has captured variables, then 239 // we'll do less reference counting if we wake the waker by reference and then drop it. 240 if mem::size_of::<S>() > 0 { 241 Self::wake_by_ref(ptr); 242 Self::drop_waker(ptr); 243 return; 244 } 245 246 let raw = Self::from_ptr(ptr); 247 248 let mut state = (*raw.header).state.load(Ordering::Acquire); 249 250 loop { 251 // If the task is completed or closed, it can't be woken up. 252 if state & (COMPLETED | CLOSED) != 0 { 253 // Drop the waker. 254 Self::drop_waker(ptr); 255 break; 256 } 257 258 // If the task is already scheduled, we just need to synchronize with the thread that 259 // will run the task by "publishing" our current view of the memory. 260 if state & SCHEDULED != 0 { 261 // Update the state without actually modifying it. 262 match (*raw.header).state.compare_exchange_weak( 263 state, 264 state, 265 Ordering::AcqRel, 266 Ordering::Acquire, 267 ) { 268 Ok(_) => { 269 // Drop the waker. 270 Self::drop_waker(ptr); 271 break; 272 } 273 Err(s) => state = s, 274 } 275 } else { 276 // Mark the task as scheduled. 277 match (*raw.header).state.compare_exchange_weak( 278 state, 279 state | SCHEDULED, 280 Ordering::AcqRel, 281 Ordering::Acquire, 282 ) { 283 Ok(_) => { 284 // If the task is not yet scheduled and isn't currently running, now is the 285 // time to schedule it. 286 if state & RUNNING == 0 { 287 // Schedule the task. 288 Self::schedule(ptr, ScheduleInfo::new(false)); 289 } else { 290 // Drop the waker. 291 Self::drop_waker(ptr); 292 } 293 294 break; 295 } 296 Err(s) => state = s, 297 } 298 } 299 } 300 } 301 302 /// Wakes a waker by reference. wake_by_ref(ptr: *const ())303 unsafe fn wake_by_ref(ptr: *const ()) { 304 let raw = Self::from_ptr(ptr); 305 306 let mut state = (*raw.header).state.load(Ordering::Acquire); 307 308 loop { 309 // If the task is completed or closed, it can't be woken up. 310 if state & (COMPLETED | CLOSED) != 0 { 311 break; 312 } 313 314 // If the task is already scheduled, we just need to synchronize with the thread that 315 // will run the task by "publishing" our current view of the memory. 316 if state & SCHEDULED != 0 { 317 // Update the state without actually modifying it. 318 match (*raw.header).state.compare_exchange_weak( 319 state, 320 state, 321 Ordering::AcqRel, 322 Ordering::Acquire, 323 ) { 324 Ok(_) => break, 325 Err(s) => state = s, 326 } 327 } else { 328 // If the task is not running, we can schedule right away. 329 let new = if state & RUNNING == 0 { 330 (state | SCHEDULED) + REFERENCE 331 } else { 332 state | SCHEDULED 333 }; 334 335 // Mark the task as scheduled. 336 match (*raw.header).state.compare_exchange_weak( 337 state, 338 new, 339 Ordering::AcqRel, 340 Ordering::Acquire, 341 ) { 342 Ok(_) => { 343 // If the task is not running, now is the time to schedule. 344 if state & RUNNING == 0 { 345 // If the reference count overflowed, abort. 346 if state > isize::MAX as usize { 347 abort(); 348 } 349 350 // Schedule the task. There is no need to call `Self::schedule(ptr)` 351 // because the schedule function cannot be destroyed while the waker is 352 // still alive. 353 let task = Runnable { 354 ptr: NonNull::new_unchecked(ptr as *mut ()), 355 _marker: PhantomData, 356 }; 357 (*raw.schedule).schedule(task, ScheduleInfo::new(false)); 358 } 359 360 break; 361 } 362 Err(s) => state = s, 363 } 364 } 365 } 366 } 367 368 /// Clones a waker. clone_waker(ptr: *const ()) -> RawWaker369 unsafe fn clone_waker(ptr: *const ()) -> RawWaker { 370 let raw = Self::from_ptr(ptr); 371 372 // Increment the reference count. With any kind of reference-counted data structure, 373 // relaxed ordering is appropriate when incrementing the counter. 374 let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); 375 376 // If the reference count overflowed, abort. 377 if state > isize::MAX as usize { 378 abort(); 379 } 380 381 RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) 382 } 383 384 /// Drops a waker. 385 /// 386 /// This function will decrement the reference count. If it drops down to zero, the associated 387 /// `Task` has been dropped too, and the task has not been completed, then it will get 388 /// scheduled one more time so that its future gets dropped by the executor. 389 #[inline] drop_waker(ptr: *const ())390 unsafe fn drop_waker(ptr: *const ()) { 391 let raw = Self::from_ptr(ptr); 392 393 // Decrement the reference count. 394 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; 395 396 // If this was the last reference to the task and the `Task` has been dropped too, 397 // then we need to decide how to destroy the task. 398 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { 399 if new & (COMPLETED | CLOSED) == 0 { 400 // If the task was not completed nor closed, close it and schedule one more time so 401 // that its future gets dropped by the executor. 402 (*raw.header) 403 .state 404 .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); 405 Self::schedule(ptr, ScheduleInfo::new(false)); 406 } else { 407 // Otherwise, destroy the task right away. 408 Self::destroy(ptr); 409 } 410 } 411 } 412 413 /// Drops a task reference (`Runnable` or `Waker`). 414 /// 415 /// This function will decrement the reference count. If it drops down to zero and the 416 /// associated `Task` handle has been dropped too, then the task gets destroyed. 417 #[inline] drop_ref(ptr: *const ())418 unsafe fn drop_ref(ptr: *const ()) { 419 let raw = Self::from_ptr(ptr); 420 421 // Decrement the reference count. 422 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; 423 424 // If this was the last reference to the task and the `Task` has been dropped too, 425 // then destroy the task. 426 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { 427 Self::destroy(ptr); 428 } 429 } 430 431 /// Schedules a task for running. 432 /// 433 /// This function doesn't modify the state of the task. It only passes the task reference to 434 /// its schedule function. schedule(ptr: *const (), info: ScheduleInfo)435 unsafe fn schedule(ptr: *const (), info: ScheduleInfo) { 436 let raw = Self::from_ptr(ptr); 437 438 // If the schedule function has captured variables, create a temporary waker that prevents 439 // the task from getting deallocated while the function is being invoked. 440 let _waker; 441 if mem::size_of::<S>() > 0 { 442 _waker = Waker::from_raw(Self::clone_waker(ptr)); 443 } 444 445 let task = Runnable { 446 ptr: NonNull::new_unchecked(ptr as *mut ()), 447 _marker: PhantomData, 448 }; 449 (*raw.schedule).schedule(task, info); 450 } 451 452 /// Drops the future inside a task. 453 #[inline] drop_future(ptr: *const ())454 unsafe fn drop_future(ptr: *const ()) { 455 let raw = Self::from_ptr(ptr); 456 457 // We need a safeguard against panics because the destructor can panic. 458 abort_on_panic(|| { 459 raw.future.drop_in_place(); 460 }) 461 } 462 463 /// Returns a pointer to the output inside a task. get_output(ptr: *const ()) -> *const ()464 unsafe fn get_output(ptr: *const ()) -> *const () { 465 let raw = Self::from_ptr(ptr); 466 raw.output as *const () 467 } 468 469 /// Cleans up task's resources and deallocates it. 470 /// 471 /// The schedule function will be dropped, and the task will then get deallocated. 472 /// The task must be closed before this function is called. 473 #[inline] destroy(ptr: *const ())474 unsafe fn destroy(ptr: *const ()) { 475 let raw = Self::from_ptr(ptr); 476 let task_layout = Self::task_layout(); 477 478 // We need a safeguard against panics because destructors can panic. 479 abort_on_panic(|| { 480 // Drop the header along with the metadata. 481 (raw.header as *mut Header<M>).drop_in_place(); 482 483 // Drop the schedule function. 484 (raw.schedule as *mut S).drop_in_place(); 485 }); 486 487 // Finally, deallocate the memory reserved by the task. 488 alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); 489 } 490 491 /// Runs a task. 492 /// 493 /// If polling its future panics, the task will be closed and the panic will be propagated into 494 /// the caller. run(ptr: *const ()) -> bool495 unsafe fn run(ptr: *const ()) -> bool { 496 let raw = Self::from_ptr(ptr); 497 498 // Create a context from the raw task pointer and the vtable inside the its header. 499 let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE))); 500 let cx = &mut Context::from_waker(&waker); 501 502 let mut state = (*raw.header).state.load(Ordering::Acquire); 503 504 // Update the task's state before polling its future. 505 loop { 506 // If the task has already been closed, drop the task reference and return. 507 if state & CLOSED != 0 { 508 // Drop the future. 509 Self::drop_future(ptr); 510 511 // Mark the task as unscheduled. 512 let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); 513 514 // Take the awaiter out. 515 let mut awaiter = None; 516 if state & AWAITER != 0 { 517 awaiter = (*raw.header).take(None); 518 } 519 520 // Drop the task reference. 521 Self::drop_ref(ptr); 522 523 // Notify the awaiter that the future has been dropped. 524 if let Some(w) = awaiter { 525 abort_on_panic(|| w.wake()); 526 } 527 return false; 528 } 529 530 // Mark the task as unscheduled and running. 531 match (*raw.header).state.compare_exchange_weak( 532 state, 533 (state & !SCHEDULED) | RUNNING, 534 Ordering::AcqRel, 535 Ordering::Acquire, 536 ) { 537 Ok(_) => { 538 // Update the state because we're continuing with polling the future. 539 state = (state & !SCHEDULED) | RUNNING; 540 break; 541 } 542 Err(s) => state = s, 543 } 544 } 545 546 // Poll the inner future, but surround it with a guard that closes the task in case polling 547 // panics. 548 // If available, we should also try to catch the panic so that it is propagated correctly. 549 let guard = Guard(raw); 550 551 // Panic propagation is not available for no_std. 552 #[cfg(not(feature = "std"))] 553 let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok); 554 555 #[cfg(feature = "std")] 556 let poll = { 557 // Check if we should propagate panics. 558 if (*raw.header).propagate_panic { 559 // Use catch_unwind to catch the panic. 560 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { 561 <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx) 562 })) { 563 Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)), 564 Ok(Poll::Pending) => Poll::Pending, 565 Err(e) => Poll::Ready(Err(e)), 566 } 567 } else { 568 <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok) 569 } 570 }; 571 572 mem::forget(guard); 573 574 match poll { 575 Poll::Ready(out) => { 576 // Replace the future with its output. 577 Self::drop_future(ptr); 578 raw.output.write(out); 579 580 // The task is now completed. 581 loop { 582 // If the `Task` is dropped, we'll need to close it and drop the output. 583 let new = if state & TASK == 0 { 584 (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED 585 } else { 586 (state & !RUNNING & !SCHEDULED) | COMPLETED 587 }; 588 589 // Mark the task as not running and completed. 590 match (*raw.header).state.compare_exchange_weak( 591 state, 592 new, 593 Ordering::AcqRel, 594 Ordering::Acquire, 595 ) { 596 Ok(_) => { 597 // If the `Task` is dropped or if the task was closed while running, 598 // now it's time to drop the output. 599 if state & TASK == 0 || state & CLOSED != 0 { 600 // Drop the output. 601 abort_on_panic(|| raw.output.drop_in_place()); 602 } 603 604 // Take the awaiter out. 605 let mut awaiter = None; 606 if state & AWAITER != 0 { 607 awaiter = (*raw.header).take(None); 608 } 609 610 // Drop the task reference. 611 Self::drop_ref(ptr); 612 613 // Notify the awaiter that the future has been dropped. 614 if let Some(w) = awaiter { 615 abort_on_panic(|| w.wake()); 616 } 617 break; 618 } 619 Err(s) => state = s, 620 } 621 } 622 } 623 Poll::Pending => { 624 let mut future_dropped = false; 625 626 // The task is still not completed. 627 loop { 628 // If the task was closed while running, we'll need to unschedule in case it 629 // was woken up and then destroy it. 630 let new = if state & CLOSED != 0 { 631 state & !RUNNING & !SCHEDULED 632 } else { 633 state & !RUNNING 634 }; 635 636 if state & CLOSED != 0 && !future_dropped { 637 // The thread that closed the task didn't drop the future because it was 638 // running so now it's our responsibility to do so. 639 Self::drop_future(ptr); 640 future_dropped = true; 641 } 642 643 // Mark the task as not running. 644 match (*raw.header).state.compare_exchange_weak( 645 state, 646 new, 647 Ordering::AcqRel, 648 Ordering::Acquire, 649 ) { 650 Ok(state) => { 651 // If the task was closed while running, we need to notify the awaiter. 652 // If the task was woken up while running, we need to schedule it. 653 // Otherwise, we just drop the task reference. 654 if state & CLOSED != 0 { 655 // Take the awaiter out. 656 let mut awaiter = None; 657 if state & AWAITER != 0 { 658 awaiter = (*raw.header).take(None); 659 } 660 661 // Drop the task reference. 662 Self::drop_ref(ptr); 663 664 // Notify the awaiter that the future has been dropped. 665 if let Some(w) = awaiter { 666 abort_on_panic(|| w.wake()); 667 } 668 } else if state & SCHEDULED != 0 { 669 // The thread that woke the task up didn't reschedule it because 670 // it was running so now it's our responsibility to do so. 671 Self::schedule(ptr, ScheduleInfo::new(true)); 672 return true; 673 } else { 674 // Drop the task reference. 675 Self::drop_ref(ptr); 676 } 677 break; 678 } 679 Err(s) => state = s, 680 } 681 } 682 } 683 } 684 685 return false; 686 687 /// A guard that closes the task if polling its future panics. 688 struct Guard<F, T, S, M>(RawTask<F, T, S, M>) 689 where 690 F: Future<Output = T>, 691 S: Schedule<M>; 692 693 impl<F, T, S, M> Drop for Guard<F, T, S, M> 694 where 695 F: Future<Output = T>, 696 S: Schedule<M>, 697 { 698 fn drop(&mut self) { 699 let raw = self.0; 700 let ptr = raw.header as *const (); 701 702 unsafe { 703 let mut state = (*raw.header).state.load(Ordering::Acquire); 704 705 loop { 706 // If the task was closed while running, then unschedule it, drop its 707 // future, and drop the task reference. 708 if state & CLOSED != 0 { 709 // The thread that closed the task didn't drop the future because it 710 // was running so now it's our responsibility to do so. 711 RawTask::<F, T, S, M>::drop_future(ptr); 712 713 // Mark the task as not running and not scheduled. 714 (*raw.header) 715 .state 716 .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); 717 718 // Take the awaiter out. 719 let mut awaiter = None; 720 if state & AWAITER != 0 { 721 awaiter = (*raw.header).take(None); 722 } 723 724 // Drop the task reference. 725 RawTask::<F, T, S, M>::drop_ref(ptr); 726 727 // Notify the awaiter that the future has been dropped. 728 if let Some(w) = awaiter { 729 abort_on_panic(|| w.wake()); 730 } 731 break; 732 } 733 734 // Mark the task as not running, not scheduled, and closed. 735 match (*raw.header).state.compare_exchange_weak( 736 state, 737 (state & !RUNNING & !SCHEDULED) | CLOSED, 738 Ordering::AcqRel, 739 Ordering::Acquire, 740 ) { 741 Ok(state) => { 742 // Drop the future because the task is now closed. 743 RawTask::<F, T, S, M>::drop_future(ptr); 744 745 // Take the awaiter out. 746 let mut awaiter = None; 747 if state & AWAITER != 0 { 748 awaiter = (*raw.header).take(None); 749 } 750 751 // Drop the task reference. 752 RawTask::<F, T, S, M>::drop_ref(ptr); 753 754 // Notify the awaiter that the future has been dropped. 755 if let Some(w) = awaiter { 756 abort_on_panic(|| w.wake()); 757 } 758 break; 759 } 760 Err(s) => state = s, 761 } 762 } 763 } 764 } 765 } 766 } 767 } 768