1 use alloc::alloc::Layout as StdLayout; 2 use core::cell::UnsafeCell; 3 use core::future::Future; 4 use core::mem::{self, ManuallyDrop}; 5 use core::pin::Pin; 6 use core::ptr::NonNull; 7 use core::sync::atomic::{AtomicUsize, Ordering}; 8 use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; 9 10 use crate::header::Header; 11 use crate::state::*; 12 use crate::utils::{abort, abort_on_panic, max, Layout}; 13 use crate::Runnable; 14 15 /// The vtable for a task. 16 pub(crate) struct TaskVTable { 17 /// Schedules the task. 18 pub(crate) schedule: unsafe fn(*const ()), 19 20 /// Drops the future inside the task. 21 pub(crate) drop_future: unsafe fn(*const ()), 22 23 /// Returns a pointer to the output stored after completion. 24 pub(crate) get_output: unsafe fn(*const ()) -> *const (), 25 26 /// Drops the task reference (`Runnable` or `Waker`). 27 pub(crate) drop_ref: unsafe fn(ptr: *const ()), 28 29 /// Destroys the task. 30 pub(crate) destroy: unsafe fn(*const ()), 31 32 /// Runs the task. 33 pub(crate) run: unsafe fn(*const ()) -> bool, 34 35 /// Creates a new waker associated with the task. 36 pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker, 37 38 /// The memory layout of the task. This information enables 39 /// debuggers to decode raw task memory blobs. Do not remove 40 /// the field, even if it appears to be unused. 41 #[allow(unused)] 42 pub(crate) layout_info: &'static Option<TaskLayout>, 43 } 44 45 /// Memory layout of a task. 46 /// 47 /// This struct contains the following information: 48 /// 49 /// 1. How to allocate and deallocate the task. 50 /// 2. How to access the fields inside the task. 51 #[derive(Clone, Copy)] 52 pub(crate) struct TaskLayout { 53 /// Memory layout of the whole task. 54 pub(crate) layout: StdLayout, 55 56 /// Offset into the task at which the schedule function is stored. 57 pub(crate) offset_s: usize, 58 59 /// Offset into the task at which the future is stored. 60 pub(crate) offset_f: usize, 61 62 /// Offset into the task at which the output is stored. 63 pub(crate) offset_r: usize, 64 } 65 66 /// Raw pointers to the fields inside a task. 67 pub(crate) struct RawTask<F, T, S> { 68 /// The task header. 69 pub(crate) header: *const Header, 70 71 /// The schedule function. 72 pub(crate) schedule: *const S, 73 74 /// The future. 75 pub(crate) future: *mut F, 76 77 /// The output of the future. 78 pub(crate) output: *mut T, 79 } 80 81 impl<F, T, S> Copy for RawTask<F, T, S> {} 82 83 impl<F, T, S> Clone for RawTask<F, T, S> { clone(&self) -> Self84 fn clone(&self) -> Self { 85 *self 86 } 87 } 88 89 impl<F, T, S> RawTask<F, T, S> { 90 const TASK_LAYOUT: Option<TaskLayout> = Self::eval_task_layout(); 91 92 /// Computes the memory layout for a task. 93 #[inline] eval_task_layout() -> Option<TaskLayout>94 const fn eval_task_layout() -> Option<TaskLayout> { 95 // Compute the layouts for `Header`, `S`, `F`, and `T`. 96 let layout_header = Layout::new::<Header>(); 97 let layout_s = Layout::new::<S>(); 98 let layout_f = Layout::new::<F>(); 99 let layout_r = Layout::new::<T>(); 100 101 // Compute the layout for `union { F, T }`. 102 let size_union = max(layout_f.size(), layout_r.size()); 103 let align_union = max(layout_f.align(), layout_r.align()); 104 let layout_union = Layout::from_size_align(size_union, align_union); 105 106 // Compute the layout for `Header` followed `S` and `union { F, T }`. 107 let layout = layout_header; 108 let (layout, offset_s) = leap!(layout.extend(layout_s)); 109 let (layout, offset_union) = leap!(layout.extend(layout_union)); 110 let offset_f = offset_union; 111 let offset_r = offset_union; 112 113 Some(TaskLayout { 114 layout: unsafe { layout.into_std() }, 115 offset_s, 116 offset_f, 117 offset_r, 118 }) 119 } 120 } 121 122 impl<F, T, S> RawTask<F, T, S> 123 where 124 F: Future<Output = T>, 125 S: Fn(Runnable), 126 { 127 const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( 128 Self::clone_waker, 129 Self::wake, 130 Self::wake_by_ref, 131 Self::drop_waker, 132 ); 133 134 /// Allocates a task with the given `future` and `schedule` function. 135 /// 136 /// It is assumed that initially only the `Runnable` and the `Task` exist. allocate(future: F, schedule: S) -> NonNull<()>137 pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> { 138 // Compute the layout of the task for allocation. Abort if the computation fails. 139 // 140 // n.b. notgull: task_layout now automatically aborts instead of panicking 141 let task_layout = Self::task_layout(); 142 143 unsafe { 144 // Allocate enough space for the entire task. 145 let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) { 146 None => abort(), 147 Some(p) => p, 148 }; 149 150 let raw = Self::from_ptr(ptr.as_ptr()); 151 152 // Write the header as the first field of the task. 153 (raw.header as *mut Header).write(Header { 154 state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), 155 awaiter: UnsafeCell::new(None), 156 vtable: &TaskVTable { 157 schedule: Self::schedule, 158 drop_future: Self::drop_future, 159 get_output: Self::get_output, 160 drop_ref: Self::drop_ref, 161 destroy: Self::destroy, 162 run: Self::run, 163 clone_waker: Self::clone_waker, 164 layout_info: &Self::TASK_LAYOUT, 165 }, 166 }); 167 168 // Write the schedule function as the third field of the task. 169 (raw.schedule as *mut S).write(schedule); 170 171 // Write the future as the fourth field of the task. 172 raw.future.write(future); 173 174 ptr 175 } 176 } 177 178 /// Creates a `RawTask` from a raw task pointer. 179 #[inline] from_ptr(ptr: *const ()) -> Self180 pub(crate) fn from_ptr(ptr: *const ()) -> Self { 181 let task_layout = Self::task_layout(); 182 let p = ptr as *const u8; 183 184 unsafe { 185 Self { 186 header: p as *const Header, 187 schedule: p.add(task_layout.offset_s) as *const S, 188 future: p.add(task_layout.offset_f) as *mut F, 189 output: p.add(task_layout.offset_r) as *mut T, 190 } 191 } 192 } 193 194 /// Returns the layout of the task. 195 #[inline] task_layout() -> TaskLayout196 fn task_layout() -> TaskLayout { 197 match Self::TASK_LAYOUT { 198 Some(tl) => tl, 199 None => abort(), 200 } 201 } 202 203 /// Wakes a waker. wake(ptr: *const ())204 unsafe fn wake(ptr: *const ()) { 205 // This is just an optimization. If the schedule function has captured variables, then 206 // we'll do less reference counting if we wake the waker by reference and then drop it. 207 if mem::size_of::<S>() > 0 { 208 Self::wake_by_ref(ptr); 209 Self::drop_waker(ptr); 210 return; 211 } 212 213 let raw = Self::from_ptr(ptr); 214 215 let mut state = (*raw.header).state.load(Ordering::Acquire); 216 217 loop { 218 // If the task is completed or closed, it can't be woken up. 219 if state & (COMPLETED | CLOSED) != 0 { 220 // Drop the waker. 221 Self::drop_waker(ptr); 222 break; 223 } 224 225 // If the task is already scheduled, we just need to synchronize with the thread that 226 // will run the task by "publishing" our current view of the memory. 227 if state & SCHEDULED != 0 { 228 // Update the state without actually modifying it. 229 match (*raw.header).state.compare_exchange_weak( 230 state, 231 state, 232 Ordering::AcqRel, 233 Ordering::Acquire, 234 ) { 235 Ok(_) => { 236 // Drop the waker. 237 Self::drop_waker(ptr); 238 break; 239 } 240 Err(s) => state = s, 241 } 242 } else { 243 // Mark the task as scheduled. 244 match (*raw.header).state.compare_exchange_weak( 245 state, 246 state | SCHEDULED, 247 Ordering::AcqRel, 248 Ordering::Acquire, 249 ) { 250 Ok(_) => { 251 // If the task is not yet scheduled and isn't currently running, now is the 252 // time to schedule it. 253 if state & RUNNING == 0 { 254 // Schedule the task. 255 Self::schedule(ptr); 256 } else { 257 // Drop the waker. 258 Self::drop_waker(ptr); 259 } 260 261 break; 262 } 263 Err(s) => state = s, 264 } 265 } 266 } 267 } 268 269 /// Wakes a waker by reference. wake_by_ref(ptr: *const ())270 unsafe fn wake_by_ref(ptr: *const ()) { 271 let raw = Self::from_ptr(ptr); 272 273 let mut state = (*raw.header).state.load(Ordering::Acquire); 274 275 loop { 276 // If the task is completed or closed, it can't be woken up. 277 if state & (COMPLETED | CLOSED) != 0 { 278 break; 279 } 280 281 // If the task is already scheduled, we just need to synchronize with the thread that 282 // will run the task by "publishing" our current view of the memory. 283 if state & SCHEDULED != 0 { 284 // Update the state without actually modifying it. 285 match (*raw.header).state.compare_exchange_weak( 286 state, 287 state, 288 Ordering::AcqRel, 289 Ordering::Acquire, 290 ) { 291 Ok(_) => break, 292 Err(s) => state = s, 293 } 294 } else { 295 // If the task is not running, we can schedule right away. 296 let new = if state & RUNNING == 0 { 297 (state | SCHEDULED) + REFERENCE 298 } else { 299 state | SCHEDULED 300 }; 301 302 // Mark the task as scheduled. 303 match (*raw.header).state.compare_exchange_weak( 304 state, 305 new, 306 Ordering::AcqRel, 307 Ordering::Acquire, 308 ) { 309 Ok(_) => { 310 // If the task is not running, now is the time to schedule. 311 if state & RUNNING == 0 { 312 // If the reference count overflowed, abort. 313 if state > isize::max_value() as usize { 314 abort(); 315 } 316 317 // Schedule the task. There is no need to call `Self::schedule(ptr)` 318 // because the schedule function cannot be destroyed while the waker is 319 // still alive. 320 let task = Runnable { 321 ptr: NonNull::new_unchecked(ptr as *mut ()), 322 }; 323 (*raw.schedule)(task); 324 } 325 326 break; 327 } 328 Err(s) => state = s, 329 } 330 } 331 } 332 } 333 334 /// Clones a waker. clone_waker(ptr: *const ()) -> RawWaker335 unsafe fn clone_waker(ptr: *const ()) -> RawWaker { 336 let raw = Self::from_ptr(ptr); 337 338 // Increment the reference count. With any kind of reference-counted data structure, 339 // relaxed ordering is appropriate when incrementing the counter. 340 let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); 341 342 // If the reference count overflowed, abort. 343 if state > isize::max_value() as usize { 344 abort(); 345 } 346 347 RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) 348 } 349 350 /// Drops a waker. 351 /// 352 /// This function will decrement the reference count. If it drops down to zero, the associated 353 /// `Task` has been dropped too, and the task has not been completed, then it will get 354 /// scheduled one more time so that its future gets dropped by the executor. 355 #[inline] drop_waker(ptr: *const ())356 unsafe fn drop_waker(ptr: *const ()) { 357 let raw = Self::from_ptr(ptr); 358 359 // Decrement the reference count. 360 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; 361 362 // If this was the last reference to the task and the `Task` has been dropped too, 363 // then we need to decide how to destroy the task. 364 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { 365 if new & (COMPLETED | CLOSED) == 0 { 366 // If the task was not completed nor closed, close it and schedule one more time so 367 // that its future gets dropped by the executor. 368 (*raw.header) 369 .state 370 .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); 371 Self::schedule(ptr); 372 } else { 373 // Otherwise, destroy the task right away. 374 Self::destroy(ptr); 375 } 376 } 377 } 378 379 /// Drops a task reference (`Runnable` or `Waker`). 380 /// 381 /// This function will decrement the reference count. If it drops down to zero and the 382 /// associated `Task` handle has been dropped too, then the task gets destroyed. 383 #[inline] drop_ref(ptr: *const ())384 unsafe fn drop_ref(ptr: *const ()) { 385 let raw = Self::from_ptr(ptr); 386 387 // Decrement the reference count. 388 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; 389 390 // If this was the last reference to the task and the `Task` has been dropped too, 391 // then destroy the task. 392 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { 393 Self::destroy(ptr); 394 } 395 } 396 397 /// Schedules a task for running. 398 /// 399 /// This function doesn't modify the state of the task. It only passes the task reference to 400 /// its schedule function. schedule(ptr: *const ())401 unsafe fn schedule(ptr: *const ()) { 402 let raw = Self::from_ptr(ptr); 403 404 // If the schedule function has captured variables, create a temporary waker that prevents 405 // the task from getting deallocated while the function is being invoked. 406 let _waker; 407 if mem::size_of::<S>() > 0 { 408 _waker = Waker::from_raw(Self::clone_waker(ptr)); 409 } 410 411 let task = Runnable { 412 ptr: NonNull::new_unchecked(ptr as *mut ()), 413 }; 414 (*raw.schedule)(task); 415 } 416 417 /// Drops the future inside a task. 418 #[inline] drop_future(ptr: *const ())419 unsafe fn drop_future(ptr: *const ()) { 420 let raw = Self::from_ptr(ptr); 421 422 // We need a safeguard against panics because the destructor can panic. 423 abort_on_panic(|| { 424 raw.future.drop_in_place(); 425 }) 426 } 427 428 /// Returns a pointer to the output inside a task. get_output(ptr: *const ()) -> *const ()429 unsafe fn get_output(ptr: *const ()) -> *const () { 430 let raw = Self::from_ptr(ptr); 431 raw.output as *const () 432 } 433 434 /// Cleans up task's resources and deallocates it. 435 /// 436 /// The schedule function will be dropped, and the task will then get deallocated. 437 /// The task must be closed before this function is called. 438 #[inline] destroy(ptr: *const ())439 unsafe fn destroy(ptr: *const ()) { 440 let raw = Self::from_ptr(ptr); 441 let task_layout = Self::task_layout(); 442 443 // We need a safeguard against panics because destructors can panic. 444 abort_on_panic(|| { 445 // Drop the schedule function. 446 (raw.schedule as *mut S).drop_in_place(); 447 }); 448 449 // Finally, deallocate the memory reserved by the task. 450 alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); 451 } 452 453 /// Runs a task. 454 /// 455 /// If polling its future panics, the task will be closed and the panic will be propagated into 456 /// the caller. run(ptr: *const ()) -> bool457 unsafe fn run(ptr: *const ()) -> bool { 458 let raw = Self::from_ptr(ptr); 459 460 // Create a context from the raw task pointer and the vtable inside the its header. 461 let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE))); 462 let cx = &mut Context::from_waker(&waker); 463 464 let mut state = (*raw.header).state.load(Ordering::Acquire); 465 466 // Update the task's state before polling its future. 467 loop { 468 // If the task has already been closed, drop the task reference and return. 469 if state & CLOSED != 0 { 470 // Drop the future. 471 Self::drop_future(ptr); 472 473 // Mark the task as unscheduled. 474 let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); 475 476 // Take the awaiter out. 477 let mut awaiter = None; 478 if state & AWAITER != 0 { 479 awaiter = (*raw.header).take(None); 480 } 481 482 // Drop the task reference. 483 Self::drop_ref(ptr); 484 485 // Notify the awaiter that the future has been dropped. 486 if let Some(w) = awaiter { 487 abort_on_panic(|| w.wake()); 488 } 489 return false; 490 } 491 492 // Mark the task as unscheduled and running. 493 match (*raw.header).state.compare_exchange_weak( 494 state, 495 (state & !SCHEDULED) | RUNNING, 496 Ordering::AcqRel, 497 Ordering::Acquire, 498 ) { 499 Ok(_) => { 500 // Update the state because we're continuing with polling the future. 501 state = (state & !SCHEDULED) | RUNNING; 502 break; 503 } 504 Err(s) => state = s, 505 } 506 } 507 508 // Poll the inner future, but surround it with a guard that closes the task in case polling 509 // panics. 510 let guard = Guard(raw); 511 let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx); 512 mem::forget(guard); 513 514 match poll { 515 Poll::Ready(out) => { 516 // Replace the future with its output. 517 Self::drop_future(ptr); 518 raw.output.write(out); 519 520 // The task is now completed. 521 loop { 522 // If the `Task` is dropped, we'll need to close it and drop the output. 523 let new = if state & TASK == 0 { 524 (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED 525 } else { 526 (state & !RUNNING & !SCHEDULED) | COMPLETED 527 }; 528 529 // Mark the task as not running and completed. 530 match (*raw.header).state.compare_exchange_weak( 531 state, 532 new, 533 Ordering::AcqRel, 534 Ordering::Acquire, 535 ) { 536 Ok(_) => { 537 // If the `Task` is dropped or if the task was closed while running, 538 // now it's time to drop the output. 539 if state & TASK == 0 || state & CLOSED != 0 { 540 // Drop the output. 541 abort_on_panic(|| raw.output.drop_in_place()); 542 } 543 544 // Take the awaiter out. 545 let mut awaiter = None; 546 if state & AWAITER != 0 { 547 awaiter = (*raw.header).take(None); 548 } 549 550 // Drop the task reference. 551 Self::drop_ref(ptr); 552 553 // Notify the awaiter that the future has been dropped. 554 if let Some(w) = awaiter { 555 abort_on_panic(|| w.wake()); 556 } 557 break; 558 } 559 Err(s) => state = s, 560 } 561 } 562 } 563 Poll::Pending => { 564 let mut future_dropped = false; 565 566 // The task is still not completed. 567 loop { 568 // If the task was closed while running, we'll need to unschedule in case it 569 // was woken up and then destroy it. 570 let new = if state & CLOSED != 0 { 571 state & !RUNNING & !SCHEDULED 572 } else { 573 state & !RUNNING 574 }; 575 576 if state & CLOSED != 0 && !future_dropped { 577 // The thread that closed the task didn't drop the future because it was 578 // running so now it's our responsibility to do so. 579 Self::drop_future(ptr); 580 future_dropped = true; 581 } 582 583 // Mark the task as not running. 584 match (*raw.header).state.compare_exchange_weak( 585 state, 586 new, 587 Ordering::AcqRel, 588 Ordering::Acquire, 589 ) { 590 Ok(state) => { 591 // If the task was closed while running, we need to notify the awaiter. 592 // If the task was woken up while running, we need to schedule it. 593 // Otherwise, we just drop the task reference. 594 if state & CLOSED != 0 { 595 // Take the awaiter out. 596 let mut awaiter = None; 597 if state & AWAITER != 0 { 598 awaiter = (*raw.header).take(None); 599 } 600 601 // Drop the task reference. 602 Self::drop_ref(ptr); 603 604 // Notify the awaiter that the future has been dropped. 605 if let Some(w) = awaiter { 606 abort_on_panic(|| w.wake()); 607 } 608 } else if state & SCHEDULED != 0 { 609 // The thread that woke the task up didn't reschedule it because 610 // it was running so now it's our responsibility to do so. 611 Self::schedule(ptr); 612 return true; 613 } else { 614 // Drop the task reference. 615 Self::drop_ref(ptr); 616 } 617 break; 618 } 619 Err(s) => state = s, 620 } 621 } 622 } 623 } 624 625 return false; 626 627 /// A guard that closes the task if polling its future panics. 628 struct Guard<F, T, S>(RawTask<F, T, S>) 629 where 630 F: Future<Output = T>, 631 S: Fn(Runnable); 632 633 impl<F, T, S> Drop for Guard<F, T, S> 634 where 635 F: Future<Output = T>, 636 S: Fn(Runnable), 637 { 638 fn drop(&mut self) { 639 let raw = self.0; 640 let ptr = raw.header as *const (); 641 642 unsafe { 643 let mut state = (*raw.header).state.load(Ordering::Acquire); 644 645 loop { 646 // If the task was closed while running, then unschedule it, drop its 647 // future, and drop the task reference. 648 if state & CLOSED != 0 { 649 // The thread that closed the task didn't drop the future because it 650 // was running so now it's our responsibility to do so. 651 RawTask::<F, T, S>::drop_future(ptr); 652 653 // Mark the task as not running and not scheduled. 654 (*raw.header) 655 .state 656 .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); 657 658 // Take the awaiter out. 659 let mut awaiter = None; 660 if state & AWAITER != 0 { 661 awaiter = (*raw.header).take(None); 662 } 663 664 // Drop the task reference. 665 RawTask::<F, T, S>::drop_ref(ptr); 666 667 // Notify the awaiter that the future has been dropped. 668 if let Some(w) = awaiter { 669 abort_on_panic(|| w.wake()); 670 } 671 break; 672 } 673 674 // Mark the task as not running, not scheduled, and closed. 675 match (*raw.header).state.compare_exchange_weak( 676 state, 677 (state & !RUNNING & !SCHEDULED) | CLOSED, 678 Ordering::AcqRel, 679 Ordering::Acquire, 680 ) { 681 Ok(state) => { 682 // Drop the future because the task is now closed. 683 RawTask::<F, T, S>::drop_future(ptr); 684 685 // Take the awaiter out. 686 let mut awaiter = None; 687 if state & AWAITER != 0 { 688 awaiter = (*raw.header).take(None); 689 } 690 691 // Drop the task reference. 692 RawTask::<F, T, S>::drop_ref(ptr); 693 694 // Notify the awaiter that the future has been dropped. 695 if let Some(w) = awaiter { 696 abort_on_panic(|| w.wake()); 697 } 698 break; 699 } 700 Err(s) => state = s, 701 } 702 } 703 } 704 } 705 } 706 } 707 } 708