1 use alloc::alloc::Layout; 2 use core::cell::UnsafeCell; 3 use core::future::Future; 4 use core::mem::{self, ManuallyDrop}; 5 use core::pin::Pin; 6 use core::ptr::NonNull; 7 use core::sync::atomic::{AtomicUsize, Ordering}; 8 use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; 9 10 use crate::header::Header; 11 use crate::state::*; 12 use crate::utils::{abort, abort_on_panic, extend}; 13 use crate::Runnable; 14 15 /// The vtable for a task. 16 pub(crate) struct TaskVTable { 17 /// Schedules the task. 18 pub(crate) schedule: unsafe fn(*const ()), 19 20 /// Drops the future inside the task. 21 pub(crate) drop_future: unsafe fn(*const ()), 22 23 /// Returns a pointer to the output stored after completion. 24 pub(crate) get_output: unsafe fn(*const ()) -> *const (), 25 26 /// Drops the task reference (`Runnable` or `Waker`). 27 pub(crate) drop_ref: unsafe fn(ptr: *const ()), 28 29 /// Destroys the task. 30 pub(crate) destroy: unsafe fn(*const ()), 31 32 /// Runs the task. 33 pub(crate) run: unsafe fn(*const ()) -> bool, 34 35 /// Creates a new waker associated with the task. 36 pub(crate) clone_waker: unsafe fn(ptr: *const ()) -> RawWaker, 37 } 38 39 /// Memory layout of a task. 40 /// 41 /// This struct contains the following information: 42 /// 43 /// 1. How to allocate and deallocate the task. 44 /// 2. How to access the fields inside the task. 45 #[derive(Clone, Copy)] 46 pub(crate) struct TaskLayout { 47 /// Memory layout of the whole task. 48 pub(crate) layout: Layout, 49 50 /// Offset into the task at which the schedule function is stored. 51 pub(crate) offset_s: usize, 52 53 /// Offset into the task at which the future is stored. 54 pub(crate) offset_f: usize, 55 56 /// Offset into the task at which the output is stored. 57 pub(crate) offset_r: usize, 58 } 59 60 /// Raw pointers to the fields inside a task. 61 pub(crate) struct RawTask<F, T, S> { 62 /// The task header. 63 pub(crate) header: *const Header, 64 65 /// The schedule function. 66 pub(crate) schedule: *const S, 67 68 /// The future. 69 pub(crate) future: *mut F, 70 71 /// The output of the future. 72 pub(crate) output: *mut T, 73 } 74 75 impl<F, T, S> Copy for RawTask<F, T, S> {} 76 77 impl<F, T, S> Clone for RawTask<F, T, S> { clone(&self) -> Self78 fn clone(&self) -> Self { 79 *self 80 } 81 } 82 83 impl<F, T, S> RawTask<F, T, S> 84 where 85 F: Future<Output = T>, 86 S: Fn(Runnable), 87 { 88 const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( 89 Self::clone_waker, 90 Self::wake, 91 Self::wake_by_ref, 92 Self::drop_waker, 93 ); 94 95 /// Allocates a task with the given `future` and `schedule` function. 96 /// 97 /// It is assumed that initially only the `Runnable` and the `Task` exist. allocate(future: F, schedule: S) -> NonNull<()>98 pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> { 99 // Compute the layout of the task for allocation. Abort if the computation fails. 100 let task_layout = abort_on_panic(|| Self::task_layout()); 101 102 unsafe { 103 // Allocate enough space for the entire task. 104 let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) { 105 None => abort(), 106 Some(p) => p, 107 }; 108 109 let raw = Self::from_ptr(ptr.as_ptr()); 110 111 // Write the header as the first field of the task. 112 (raw.header as *mut Header).write(Header { 113 state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), 114 awaiter: UnsafeCell::new(None), 115 vtable: &TaskVTable { 116 schedule: Self::schedule, 117 drop_future: Self::drop_future, 118 get_output: Self::get_output, 119 drop_ref: Self::drop_ref, 120 destroy: Self::destroy, 121 run: Self::run, 122 clone_waker: Self::clone_waker, 123 }, 124 }); 125 126 // Write the schedule function as the third field of the task. 127 (raw.schedule as *mut S).write(schedule); 128 129 // Write the future as the fourth field of the task. 130 raw.future.write(future); 131 132 ptr 133 } 134 } 135 136 /// Creates a `RawTask` from a raw task pointer. 137 #[inline] from_ptr(ptr: *const ()) -> Self138 pub(crate) fn from_ptr(ptr: *const ()) -> Self { 139 let task_layout = Self::task_layout(); 140 let p = ptr as *const u8; 141 142 unsafe { 143 Self { 144 header: p as *const Header, 145 schedule: p.add(task_layout.offset_s) as *const S, 146 future: p.add(task_layout.offset_f) as *mut F, 147 output: p.add(task_layout.offset_r) as *mut T, 148 } 149 } 150 } 151 152 /// Returns the memory layout for a task. 153 #[inline] task_layout() -> TaskLayout154 fn task_layout() -> TaskLayout { 155 // Compute the layouts for `Header`, `S`, `F`, and `T`. 156 let layout_header = Layout::new::<Header>(); 157 let layout_s = Layout::new::<S>(); 158 let layout_f = Layout::new::<F>(); 159 let layout_r = Layout::new::<T>(); 160 161 // Compute the layout for `union { F, T }`. 162 let size_union = layout_f.size().max(layout_r.size()); 163 let align_union = layout_f.align().max(layout_r.align()); 164 let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) }; 165 166 // Compute the layout for `Header` followed `S` and `union { F, T }`. 167 let layout = layout_header; 168 let (layout, offset_s) = extend(layout, layout_s); 169 let (layout, offset_union) = extend(layout, layout_union); 170 let offset_f = offset_union; 171 let offset_r = offset_union; 172 173 TaskLayout { 174 layout, 175 offset_s, 176 offset_f, 177 offset_r, 178 } 179 } 180 181 /// Wakes a waker. wake(ptr: *const ())182 unsafe fn wake(ptr: *const ()) { 183 // This is just an optimization. If the schedule function has captured variables, then 184 // we'll do less reference counting if we wake the waker by reference and then drop it. 185 if mem::size_of::<S>() > 0 { 186 Self::wake_by_ref(ptr); 187 Self::drop_waker(ptr); 188 return; 189 } 190 191 let raw = Self::from_ptr(ptr); 192 193 let mut state = (*raw.header).state.load(Ordering::Acquire); 194 195 loop { 196 // If the task is completed or closed, it can't be woken up. 197 if state & (COMPLETED | CLOSED) != 0 { 198 // Drop the waker. 199 Self::drop_waker(ptr); 200 break; 201 } 202 203 // If the task is already scheduled, we just need to synchronize with the thread that 204 // will run the task by "publishing" our current view of the memory. 205 if state & SCHEDULED != 0 { 206 // Update the state without actually modifying it. 207 match (*raw.header).state.compare_exchange_weak( 208 state, 209 state, 210 Ordering::AcqRel, 211 Ordering::Acquire, 212 ) { 213 Ok(_) => { 214 // Drop the waker. 215 Self::drop_waker(ptr); 216 break; 217 } 218 Err(s) => state = s, 219 } 220 } else { 221 // Mark the task as scheduled. 222 match (*raw.header).state.compare_exchange_weak( 223 state, 224 state | SCHEDULED, 225 Ordering::AcqRel, 226 Ordering::Acquire, 227 ) { 228 Ok(_) => { 229 // If the task is not yet scheduled and isn't currently running, now is the 230 // time to schedule it. 231 if state & RUNNING == 0 { 232 // Schedule the task. 233 Self::schedule(ptr); 234 } else { 235 // Drop the waker. 236 Self::drop_waker(ptr); 237 } 238 239 break; 240 } 241 Err(s) => state = s, 242 } 243 } 244 } 245 } 246 247 /// Wakes a waker by reference. wake_by_ref(ptr: *const ())248 unsafe fn wake_by_ref(ptr: *const ()) { 249 let raw = Self::from_ptr(ptr); 250 251 let mut state = (*raw.header).state.load(Ordering::Acquire); 252 253 loop { 254 // If the task is completed or closed, it can't be woken up. 255 if state & (COMPLETED | CLOSED) != 0 { 256 break; 257 } 258 259 // If the task is already scheduled, we just need to synchronize with the thread that 260 // will run the task by "publishing" our current view of the memory. 261 if state & SCHEDULED != 0 { 262 // Update the state without actually modifying it. 263 match (*raw.header).state.compare_exchange_weak( 264 state, 265 state, 266 Ordering::AcqRel, 267 Ordering::Acquire, 268 ) { 269 Ok(_) => break, 270 Err(s) => state = s, 271 } 272 } else { 273 // If the task is not running, we can schedule right away. 274 let new = if state & RUNNING == 0 { 275 (state | SCHEDULED) + REFERENCE 276 } else { 277 state | SCHEDULED 278 }; 279 280 // Mark the task as scheduled. 281 match (*raw.header).state.compare_exchange_weak( 282 state, 283 new, 284 Ordering::AcqRel, 285 Ordering::Acquire, 286 ) { 287 Ok(_) => { 288 // If the task is not running, now is the time to schedule. 289 if state & RUNNING == 0 { 290 // If the reference count overflowed, abort. 291 if state > isize::max_value() as usize { 292 abort(); 293 } 294 295 // Schedule the task. There is no need to call `Self::schedule(ptr)` 296 // because the schedule function cannot be destroyed while the waker is 297 // still alive. 298 let task = Runnable { 299 ptr: NonNull::new_unchecked(ptr as *mut ()), 300 }; 301 (*raw.schedule)(task); 302 } 303 304 break; 305 } 306 Err(s) => state = s, 307 } 308 } 309 } 310 } 311 312 /// Clones a waker. clone_waker(ptr: *const ()) -> RawWaker313 unsafe fn clone_waker(ptr: *const ()) -> RawWaker { 314 let raw = Self::from_ptr(ptr); 315 316 // Increment the reference count. With any kind of reference-counted data structure, 317 // relaxed ordering is appropriate when incrementing the counter. 318 let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed); 319 320 // If the reference count overflowed, abort. 321 if state > isize::max_value() as usize { 322 abort(); 323 } 324 325 RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE) 326 } 327 328 /// Drops a waker. 329 /// 330 /// This function will decrement the reference count. If it drops down to zero, the associated 331 /// `Task` has been dropped too, and the task has not been completed, then it will get 332 /// scheduled one more time so that its future gets dropped by the executor. 333 #[inline] drop_waker(ptr: *const ())334 unsafe fn drop_waker(ptr: *const ()) { 335 let raw = Self::from_ptr(ptr); 336 337 // Decrement the reference count. 338 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; 339 340 // If this was the last reference to the task and the `Task` has been dropped too, 341 // then we need to decide how to destroy the task. 342 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { 343 if new & (COMPLETED | CLOSED) == 0 { 344 // If the task was not completed nor closed, close it and schedule one more time so 345 // that its future gets dropped by the executor. 346 (*raw.header) 347 .state 348 .store(SCHEDULED | CLOSED | REFERENCE, Ordering::Release); 349 Self::schedule(ptr); 350 } else { 351 // Otherwise, destroy the task right away. 352 Self::destroy(ptr); 353 } 354 } 355 } 356 357 /// Drops a task reference (`Runnable` or `Waker`). 358 /// 359 /// This function will decrement the reference count. If it drops down to zero and the 360 /// associated `Task` handle has been dropped too, then the task gets destroyed. 361 #[inline] drop_ref(ptr: *const ())362 unsafe fn drop_ref(ptr: *const ()) { 363 let raw = Self::from_ptr(ptr); 364 365 // Decrement the reference count. 366 let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE; 367 368 // If this was the last reference to the task and the `Task` has been dropped too, 369 // then destroy the task. 370 if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { 371 Self::destroy(ptr); 372 } 373 } 374 375 /// Schedules a task for running. 376 /// 377 /// This function doesn't modify the state of the task. It only passes the task reference to 378 /// its schedule function. schedule(ptr: *const ())379 unsafe fn schedule(ptr: *const ()) { 380 let raw = Self::from_ptr(ptr); 381 382 // If the schedule function has captured variables, create a temporary waker that prevents 383 // the task from getting deallocated while the function is being invoked. 384 let _waker; 385 if mem::size_of::<S>() > 0 { 386 _waker = Waker::from_raw(Self::clone_waker(ptr)); 387 } 388 389 let task = Runnable { 390 ptr: NonNull::new_unchecked(ptr as *mut ()), 391 }; 392 (*raw.schedule)(task); 393 } 394 395 /// Drops the future inside a task. 396 #[inline] drop_future(ptr: *const ())397 unsafe fn drop_future(ptr: *const ()) { 398 let raw = Self::from_ptr(ptr); 399 400 // We need a safeguard against panics because the destructor can panic. 401 abort_on_panic(|| { 402 raw.future.drop_in_place(); 403 }) 404 } 405 406 /// Returns a pointer to the output inside a task. get_output(ptr: *const ()) -> *const ()407 unsafe fn get_output(ptr: *const ()) -> *const () { 408 let raw = Self::from_ptr(ptr); 409 raw.output as *const () 410 } 411 412 /// Cleans up task's resources and deallocates it. 413 /// 414 /// The schedule function will be dropped, and the task will then get deallocated. 415 /// The task must be closed before this function is called. 416 #[inline] destroy(ptr: *const ())417 unsafe fn destroy(ptr: *const ()) { 418 let raw = Self::from_ptr(ptr); 419 let task_layout = Self::task_layout(); 420 421 // We need a safeguard against panics because destructors can panic. 422 abort_on_panic(|| { 423 // Drop the schedule function. 424 (raw.schedule as *mut S).drop_in_place(); 425 }); 426 427 // Finally, deallocate the memory reserved by the task. 428 alloc::alloc::dealloc(ptr as *mut u8, task_layout.layout); 429 } 430 431 /// Runs a task. 432 /// 433 /// If polling its future panics, the task will be closed and the panic will be propagated into 434 /// the caller. run(ptr: *const ()) -> bool435 unsafe fn run(ptr: *const ()) -> bool { 436 let raw = Self::from_ptr(ptr); 437 438 // Create a context from the raw task pointer and the vtable inside the its header. 439 let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &Self::RAW_WAKER_VTABLE))); 440 let cx = &mut Context::from_waker(&waker); 441 442 let mut state = (*raw.header).state.load(Ordering::Acquire); 443 444 // Update the task's state before polling its future. 445 loop { 446 // If the task has already been closed, drop the task reference and return. 447 if state & CLOSED != 0 { 448 // Drop the future. 449 Self::drop_future(ptr); 450 451 // Mark the task as unscheduled. 452 let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel); 453 454 // Take the awaiter out. 455 let mut awaiter = None; 456 if state & AWAITER != 0 { 457 awaiter = (*raw.header).take(None); 458 } 459 460 // Drop the task reference. 461 Self::drop_ref(ptr); 462 463 // Notify the awaiter that the future has been dropped. 464 if let Some(w) = awaiter { 465 abort_on_panic(|| w.wake()); 466 } 467 return false; 468 } 469 470 // Mark the task as unscheduled and running. 471 match (*raw.header).state.compare_exchange_weak( 472 state, 473 (state & !SCHEDULED) | RUNNING, 474 Ordering::AcqRel, 475 Ordering::Acquire, 476 ) { 477 Ok(_) => { 478 // Update the state because we're continuing with polling the future. 479 state = (state & !SCHEDULED) | RUNNING; 480 break; 481 } 482 Err(s) => state = s, 483 } 484 } 485 486 // Poll the inner future, but surround it with a guard that closes the task in case polling 487 // panics. 488 let guard = Guard(raw); 489 let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx); 490 mem::forget(guard); 491 492 match poll { 493 Poll::Ready(out) => { 494 // Replace the future with its output. 495 Self::drop_future(ptr); 496 raw.output.write(out); 497 498 // The task is now completed. 499 loop { 500 // If the `Task` is dropped, we'll need to close it and drop the output. 501 let new = if state & TASK == 0 { 502 (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED 503 } else { 504 (state & !RUNNING & !SCHEDULED) | COMPLETED 505 }; 506 507 // Mark the task as not running and completed. 508 match (*raw.header).state.compare_exchange_weak( 509 state, 510 new, 511 Ordering::AcqRel, 512 Ordering::Acquire, 513 ) { 514 Ok(_) => { 515 // If the `Task` is dropped or if the task was closed while running, 516 // now it's time to drop the output. 517 if state & TASK == 0 || state & CLOSED != 0 { 518 // Drop the output. 519 abort_on_panic(|| raw.output.drop_in_place()); 520 } 521 522 // Take the awaiter out. 523 let mut awaiter = None; 524 if state & AWAITER != 0 { 525 awaiter = (*raw.header).take(None); 526 } 527 528 // Drop the task reference. 529 Self::drop_ref(ptr); 530 531 // Notify the awaiter that the future has been dropped. 532 if let Some(w) = awaiter { 533 abort_on_panic(|| w.wake()); 534 } 535 break; 536 } 537 Err(s) => state = s, 538 } 539 } 540 } 541 Poll::Pending => { 542 let mut future_dropped = false; 543 544 // The task is still not completed. 545 loop { 546 // If the task was closed while running, we'll need to unschedule in case it 547 // was woken up and then destroy it. 548 let new = if state & CLOSED != 0 { 549 state & !RUNNING & !SCHEDULED 550 } else { 551 state & !RUNNING 552 }; 553 554 if state & CLOSED != 0 && !future_dropped { 555 // The thread that closed the task didn't drop the future because it was 556 // running so now it's our responsibility to do so. 557 Self::drop_future(ptr); 558 future_dropped = true; 559 } 560 561 // Mark the task as not running. 562 match (*raw.header).state.compare_exchange_weak( 563 state, 564 new, 565 Ordering::AcqRel, 566 Ordering::Acquire, 567 ) { 568 Ok(state) => { 569 // If the task was closed while running, we need to notify the awaiter. 570 // If the task was woken up while running, we need to schedule it. 571 // Otherwise, we just drop the task reference. 572 if state & CLOSED != 0 { 573 // Take the awaiter out. 574 let mut awaiter = None; 575 if state & AWAITER != 0 { 576 awaiter = (*raw.header).take(None); 577 } 578 579 // Drop the task reference. 580 Self::drop_ref(ptr); 581 582 // Notify the awaiter that the future has been dropped. 583 if let Some(w) = awaiter { 584 abort_on_panic(|| w.wake()); 585 } 586 } else if state & SCHEDULED != 0 { 587 // The thread that woke the task up didn't reschedule it because 588 // it was running so now it's our responsibility to do so. 589 Self::schedule(ptr); 590 return true; 591 } else { 592 // Drop the task reference. 593 Self::drop_ref(ptr); 594 } 595 break; 596 } 597 Err(s) => state = s, 598 } 599 } 600 } 601 } 602 603 return false; 604 605 /// A guard that closes the task if polling its future panics. 606 struct Guard<F, T, S>(RawTask<F, T, S>) 607 where 608 F: Future<Output = T>, 609 S: Fn(Runnable); 610 611 impl<F, T, S> Drop for Guard<F, T, S> 612 where 613 F: Future<Output = T>, 614 S: Fn(Runnable), 615 { 616 fn drop(&mut self) { 617 let raw = self.0; 618 let ptr = raw.header as *const (); 619 620 unsafe { 621 let mut state = (*raw.header).state.load(Ordering::Acquire); 622 623 loop { 624 // If the task was closed while running, then unschedule it, drop its 625 // future, and drop the task reference. 626 if state & CLOSED != 0 { 627 // The thread that closed the task didn't drop the future because it 628 // was running so now it's our responsibility to do so. 629 RawTask::<F, T, S>::drop_future(ptr); 630 631 // Mark the task as not running and not scheduled. 632 (*raw.header) 633 .state 634 .fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel); 635 636 // Take the awaiter out. 637 let mut awaiter = None; 638 if state & AWAITER != 0 { 639 awaiter = (*raw.header).take(None); 640 } 641 642 // Drop the task reference. 643 RawTask::<F, T, S>::drop_ref(ptr); 644 645 // Notify the awaiter that the future has been dropped. 646 if let Some(w) = awaiter { 647 abort_on_panic(|| w.wake()); 648 } 649 break; 650 } 651 652 // Mark the task as not running, not scheduled, and closed. 653 match (*raw.header).state.compare_exchange_weak( 654 state, 655 (state & !RUNNING & !SCHEDULED) | CLOSED, 656 Ordering::AcqRel, 657 Ordering::Acquire, 658 ) { 659 Ok(state) => { 660 // Drop the future because the task is now closed. 661 RawTask::<F, T, S>::drop_future(ptr); 662 663 // Take the awaiter out. 664 let mut awaiter = None; 665 if state & AWAITER != 0 { 666 awaiter = (*raw.header).take(None); 667 } 668 669 // Drop the task reference. 670 RawTask::<F, T, S>::drop_ref(ptr); 671 672 // Notify the awaiter that the future has been dropped. 673 if let Some(w) = awaiter { 674 abort_on_panic(|| w.wake()); 675 } 676 break; 677 } 678 Err(s) => state = s, 679 } 680 } 681 } 682 } 683 } 684 } 685 } 686