• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 use crate::future::Future;
2 use crate::runtime::task::core::{Cell, Core, Header, Trailer};
3 use crate::runtime::task::state::{Snapshot, State};
4 use crate::runtime::task::waker::waker_ref;
5 use crate::runtime::task::{Id, JoinError, Notified, RawTask, Schedule, Task};
6 
7 use std::any::Any;
8 use std::mem;
9 use std::mem::ManuallyDrop;
10 use std::panic;
11 use std::ptr::NonNull;
12 use std::task::{Context, Poll, Waker};
13 
14 /// Typed raw task handle.
15 pub(super) struct Harness<T: Future, S: 'static> {
16     cell: NonNull<Cell<T, S>>,
17 }
18 
19 impl<T, S> Harness<T, S>
20 where
21     T: Future,
22     S: 'static,
23 {
from_raw(ptr: NonNull<Header>) -> Harness<T, S>24     pub(super) unsafe fn from_raw(ptr: NonNull<Header>) -> Harness<T, S> {
25         Harness {
26             cell: ptr.cast::<Cell<T, S>>(),
27         }
28     }
29 
header_ptr(&self) -> NonNull<Header>30     fn header_ptr(&self) -> NonNull<Header> {
31         self.cell.cast()
32     }
33 
header(&self) -> &Header34     fn header(&self) -> &Header {
35         unsafe { &*self.header_ptr().as_ptr() }
36     }
37 
state(&self) -> &State38     fn state(&self) -> &State {
39         &self.header().state
40     }
41 
trailer(&self) -> &Trailer42     fn trailer(&self) -> &Trailer {
43         unsafe { &self.cell.as_ref().trailer }
44     }
45 
core(&self) -> &Core<T, S>46     fn core(&self) -> &Core<T, S> {
47         unsafe { &self.cell.as_ref().core }
48     }
49 }
50 
51 /// Task operations that can be implemented without being generic over the
52 /// scheduler or task. Only one version of these methods should exist in the
53 /// final binary.
54 impl RawTask {
drop_reference(self)55     pub(super) fn drop_reference(self) {
56         if self.state().ref_dec() {
57             self.dealloc();
58         }
59     }
60 
61     /// This call consumes a ref-count and notifies the task. This will create a
62     /// new Notified and submit it if necessary.
63     ///
64     /// The caller does not need to hold a ref-count besides the one that was
65     /// passed to this call.
wake_by_val(&self)66     pub(super) fn wake_by_val(&self) {
67         use super::state::TransitionToNotifiedByVal;
68 
69         match self.state().transition_to_notified_by_val() {
70             TransitionToNotifiedByVal::Submit => {
71                 // The caller has given us a ref-count, and the transition has
72                 // created a new ref-count, so we now hold two. We turn the new
73                 // ref-count Notified and pass it to the call to `schedule`.
74                 //
75                 // The old ref-count is retained for now to ensure that the task
76                 // is not dropped during the call to `schedule` if the call
77                 // drops the task it was given.
78                 self.schedule();
79 
80                 // Now that we have completed the call to schedule, we can
81                 // release our ref-count.
82                 self.drop_reference();
83             }
84             TransitionToNotifiedByVal::Dealloc => {
85                 self.dealloc();
86             }
87             TransitionToNotifiedByVal::DoNothing => {}
88         }
89     }
90 
91     /// This call notifies the task. It will not consume any ref-counts, but the
92     /// caller should hold a ref-count.  This will create a new Notified and
93     /// submit it if necessary.
wake_by_ref(&self)94     pub(super) fn wake_by_ref(&self) {
95         use super::state::TransitionToNotifiedByRef;
96 
97         match self.state().transition_to_notified_by_ref() {
98             TransitionToNotifiedByRef::Submit => {
99                 // The transition above incremented the ref-count for a new task
100                 // and the caller also holds a ref-count. The caller's ref-count
101                 // ensures that the task is not destroyed even if the new task
102                 // is dropped before `schedule` returns.
103                 self.schedule();
104             }
105             TransitionToNotifiedByRef::DoNothing => {}
106         }
107     }
108 
109     /// Remotely aborts the task.
110     ///
111     /// The caller should hold a ref-count, but we do not consume it.
112     ///
113     /// This is similar to `shutdown` except that it asks the runtime to perform
114     /// the shutdown. This is necessary to avoid the shutdown happening in the
115     /// wrong thread for non-Send tasks.
remote_abort(&self)116     pub(super) fn remote_abort(&self) {
117         if self.state().transition_to_notified_and_cancel() {
118             // The transition has created a new ref-count, which we turn into
119             // a Notified and pass to the task.
120             //
121             // Since the caller holds a ref-count, the task cannot be destroyed
122             // before the call to `schedule` returns even if the call drops the
123             // `Notified` internally.
124             self.schedule();
125         }
126     }
127 
128     /// Try to set the waker notified when the task is complete. Returns true if
129     /// the task has already completed. If this call returns false, then the
130     /// waker will not be notified.
try_set_join_waker(&self, waker: &Waker) -> bool131     pub(super) fn try_set_join_waker(&self, waker: &Waker) -> bool {
132         can_read_output(self.header(), self.trailer(), waker)
133     }
134 }
135 
136 impl<T, S> Harness<T, S>
137 where
138     T: Future,
139     S: Schedule,
140 {
drop_reference(self)141     pub(super) fn drop_reference(self) {
142         if self.state().ref_dec() {
143             self.dealloc();
144         }
145     }
146 
147     /// Polls the inner future. A ref-count is consumed.
148     ///
149     /// All necessary state checks and transitions are performed.
150     /// Panics raised while polling the future are handled.
poll(self)151     pub(super) fn poll(self) {
152         // We pass our ref-count to `poll_inner`.
153         match self.poll_inner() {
154             PollFuture::Notified => {
155                 // The `poll_inner` call has given us two ref-counts back.
156                 // We give one of them to a new task and call `yield_now`.
157                 self.core()
158                     .scheduler
159                     .yield_now(Notified(self.get_new_task()));
160 
161                 // The remaining ref-count is now dropped. We kept the extra
162                 // ref-count until now to ensure that even if the `yield_now`
163                 // call drops the provided task, the task isn't deallocated
164                 // before after `yield_now` returns.
165                 self.drop_reference();
166             }
167             PollFuture::Complete => {
168                 self.complete();
169             }
170             PollFuture::Dealloc => {
171                 self.dealloc();
172             }
173             PollFuture::Done => (),
174         }
175     }
176 
177     /// Polls the task and cancel it if necessary. This takes ownership of a
178     /// ref-count.
179     ///
180     /// If the return value is Notified, the caller is given ownership of two
181     /// ref-counts.
182     ///
183     /// If the return value is Complete, the caller is given ownership of a
184     /// single ref-count, which should be passed on to `complete`.
185     ///
186     /// If the return value is Dealloc, then this call consumed the last
187     /// ref-count and the caller should call `dealloc`.
188     ///
189     /// Otherwise the ref-count is consumed and the caller should not access
190     /// `self` again.
poll_inner(&self) -> PollFuture191     fn poll_inner(&self) -> PollFuture {
192         use super::state::{TransitionToIdle, TransitionToRunning};
193 
194         match self.state().transition_to_running() {
195             TransitionToRunning::Success => {
196                 // Separated to reduce LLVM codegen
197                 fn transition_result_to_poll_future(result: TransitionToIdle) -> PollFuture {
198                     match result {
199                         TransitionToIdle::Ok => PollFuture::Done,
200                         TransitionToIdle::OkNotified => PollFuture::Notified,
201                         TransitionToIdle::OkDealloc => PollFuture::Dealloc,
202                         TransitionToIdle::Cancelled => PollFuture::Complete,
203                     }
204                 }
205                 let header_ptr = self.header_ptr();
206                 let waker_ref = waker_ref::<S>(&header_ptr);
207                 let cx = Context::from_waker(&waker_ref);
208                 let res = poll_future(self.core(), cx);
209 
210                 if res == Poll::Ready(()) {
211                     // The future completed. Move on to complete the task.
212                     return PollFuture::Complete;
213                 }
214 
215                 let transition_res = self.state().transition_to_idle();
216                 if let TransitionToIdle::Cancelled = transition_res {
217                     // The transition to idle failed because the task was
218                     // cancelled during the poll.
219                     cancel_task(self.core());
220                 }
221                 transition_result_to_poll_future(transition_res)
222             }
223             TransitionToRunning::Cancelled => {
224                 cancel_task(self.core());
225                 PollFuture::Complete
226             }
227             TransitionToRunning::Failed => PollFuture::Done,
228             TransitionToRunning::Dealloc => PollFuture::Dealloc,
229         }
230     }
231 
232     /// Forcibly shuts down the task.
233     ///
234     /// Attempt to transition to `Running` in order to forcibly shutdown the
235     /// task. If the task is currently running or in a state of completion, then
236     /// there is nothing further to do. When the task completes running, it will
237     /// notice the `CANCELLED` bit and finalize the task.
shutdown(self)238     pub(super) fn shutdown(self) {
239         if !self.state().transition_to_shutdown() {
240             // The task is concurrently running. No further work needed.
241             self.drop_reference();
242             return;
243         }
244 
245         // By transitioning the lifecycle to `Running`, we have permission to
246         // drop the future.
247         cancel_task(self.core());
248         self.complete();
249     }
250 
dealloc(self)251     pub(super) fn dealloc(self) {
252         // Release the join waker, if there is one.
253         self.trailer().waker.with_mut(drop);
254 
255         // Check causality
256         self.core().stage.with_mut(drop);
257 
258         // Safety: The caller of this method just transitioned our ref-count to
259         // zero, so it is our responsibility to release the allocation.
260         //
261         // We don't hold any references into the allocation at this point, but
262         // it is possible for another thread to still hold a `&State` into the
263         // allocation if that other thread has decremented its last ref-count,
264         // but has not yet returned from the relevant method on `State`.
265         //
266         // However, the `State` type consists of just an `AtomicUsize`, and an
267         // `AtomicUsize` wraps the entirety of its contents in an `UnsafeCell`.
268         // As explained in the documentation for `UnsafeCell`, such references
269         // are allowed to be dangling after their last use, even if the
270         // reference has not yet gone out of scope.
271         unsafe {
272             drop(Box::from_raw(self.cell.as_ptr()));
273         }
274     }
275 
276     // ===== join handle =====
277 
278     /// Read the task output into `dst`.
try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker)279     pub(super) fn try_read_output(self, dst: &mut Poll<super::Result<T::Output>>, waker: &Waker) {
280         if can_read_output(self.header(), self.trailer(), waker) {
281             *dst = Poll::Ready(self.core().take_output());
282         }
283     }
284 
drop_join_handle_slow(self)285     pub(super) fn drop_join_handle_slow(self) {
286         // Try to unset `JOIN_INTEREST`. This must be done as a first step in
287         // case the task concurrently completed.
288         if self.state().unset_join_interested().is_err() {
289             // It is our responsibility to drop the output. This is critical as
290             // the task output may not be `Send` and as such must remain with
291             // the scheduler or `JoinHandle`. i.e. if the output remains in the
292             // task structure until the task is deallocated, it may be dropped
293             // by a Waker on any arbitrary thread.
294             //
295             // Panics are delivered to the user via the `JoinHandle`. Given that
296             // they are dropping the `JoinHandle`, we assume they are not
297             // interested in the panic and swallow it.
298             let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
299                 self.core().drop_future_or_output();
300             }));
301         }
302 
303         // Drop the `JoinHandle` reference, possibly deallocating the task
304         self.drop_reference();
305     }
306 
307     // ====== internal ======
308 
309     /// Completes the task. This method assumes that the state is RUNNING.
complete(self)310     fn complete(self) {
311         // The future has completed and its output has been written to the task
312         // stage. We transition from running to complete.
313 
314         let snapshot = self.state().transition_to_complete();
315 
316         // We catch panics here in case dropping the future or waking the
317         // JoinHandle panics.
318         let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
319             if !snapshot.is_join_interested() {
320                 // The `JoinHandle` is not interested in the output of
321                 // this task. It is our responsibility to drop the
322                 // output.
323                 self.core().drop_future_or_output();
324             } else if snapshot.is_join_waker_set() {
325                 // Notify the waker. Reading the waker field is safe per rule 4
326                 // in task/mod.rs, since the JOIN_WAKER bit is set and the call
327                 // to transition_to_complete() above set the COMPLETE bit.
328                 self.trailer().wake_join();
329             }
330         }));
331 
332         // The task has completed execution and will no longer be scheduled.
333         let num_release = self.release();
334 
335         if self.state().transition_to_terminal(num_release) {
336             self.dealloc();
337         }
338     }
339 
340     /// Releases the task from the scheduler. Returns the number of ref-counts
341     /// that should be decremented.
release(&self) -> usize342     fn release(&self) -> usize {
343         // We don't actually increment the ref-count here, but the new task is
344         // never destroyed, so that's ok.
345         let me = ManuallyDrop::new(self.get_new_task());
346 
347         if let Some(task) = self.core().scheduler.release(&me) {
348             mem::forget(task);
349             2
350         } else {
351             1
352         }
353     }
354 
355     /// Creates a new task that holds its own ref-count.
356     ///
357     /// # Safety
358     ///
359     /// Any use of `self` after this call must ensure that a ref-count to the
360     /// task holds the task alive until after the use of `self`. Passing the
361     /// returned Task to any method on `self` is unsound if dropping the Task
362     /// could drop `self` before the call on `self` returned.
get_new_task(&self) -> Task<S>363     fn get_new_task(&self) -> Task<S> {
364         // safety: The header is at the beginning of the cell, so this cast is
365         // safe.
366         unsafe { Task::from_raw(self.cell.cast()) }
367     }
368 }
369 
can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool370 fn can_read_output(header: &Header, trailer: &Trailer, waker: &Waker) -> bool {
371     // Load a snapshot of the current task state
372     let snapshot = header.state.load();
373 
374     debug_assert!(snapshot.is_join_interested());
375 
376     if !snapshot.is_complete() {
377         // If the task is not complete, try storing the provided waker in the
378         // task's waker field.
379 
380         let res = if snapshot.is_join_waker_set() {
381             // If JOIN_WAKER is set, then JoinHandle has previously stored a
382             // waker in the waker field per step (iii) of rule 5 in task/mod.rs.
383 
384             // Optimization: if the stored waker and the provided waker wake the
385             // same task, then return without touching the waker field. (Reading
386             // the waker field below is safe per rule 3 in task/mod.rs.)
387             if unsafe { trailer.will_wake(waker) } {
388                 return false;
389             }
390 
391             // Otherwise swap the stored waker with the provided waker by
392             // following the rule 5 in task/mod.rs.
393             header
394                 .state
395                 .unset_waker()
396                 .and_then(|snapshot| set_join_waker(header, trailer, waker.clone(), snapshot))
397         } else {
398             // If JOIN_WAKER is unset, then JoinHandle has mutable access to the
399             // waker field per rule 2 in task/mod.rs; therefore, skip step (i)
400             // of rule 5 and try to store the provided waker in the waker field.
401             set_join_waker(header, trailer, waker.clone(), snapshot)
402         };
403 
404         match res {
405             Ok(_) => return false,
406             Err(snapshot) => {
407                 assert!(snapshot.is_complete());
408             }
409         }
410     }
411     true
412 }
413 
set_join_waker( header: &Header, trailer: &Trailer, waker: Waker, snapshot: Snapshot, ) -> Result<Snapshot, Snapshot>414 fn set_join_waker(
415     header: &Header,
416     trailer: &Trailer,
417     waker: Waker,
418     snapshot: Snapshot,
419 ) -> Result<Snapshot, Snapshot> {
420     assert!(snapshot.is_join_interested());
421     assert!(!snapshot.is_join_waker_set());
422 
423     // Safety: Only the `JoinHandle` may set the `waker` field. When
424     // `JOIN_INTEREST` is **not** set, nothing else will touch the field.
425     unsafe {
426         trailer.set_waker(Some(waker));
427     }
428 
429     // Update the `JoinWaker` state accordingly
430     let res = header.state.set_join_waker();
431 
432     // If the state could not be updated, then clear the join waker
433     if res.is_err() {
434         unsafe {
435             trailer.set_waker(None);
436         }
437     }
438 
439     res
440 }
441 
442 enum PollFuture {
443     Complete,
444     Notified,
445     Done,
446     Dealloc,
447 }
448 
449 /// Cancels the task and store the appropriate error in the stage field.
cancel_task<T: Future, S: Schedule>(core: &Core<T, S>)450 fn cancel_task<T: Future, S: Schedule>(core: &Core<T, S>) {
451     // Drop the future from a panic guard.
452     let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
453         core.drop_future_or_output();
454     }));
455 
456     core.store_output(Err(panic_result_to_join_error(core.task_id, res)));
457 }
458 
panic_result_to_join_error( task_id: Id, res: Result<(), Box<dyn Any + Send + 'static>>, ) -> JoinError459 fn panic_result_to_join_error(
460     task_id: Id,
461     res: Result<(), Box<dyn Any + Send + 'static>>,
462 ) -> JoinError {
463     match res {
464         Ok(()) => JoinError::cancelled(task_id),
465         Err(panic) => JoinError::panic(task_id, panic),
466     }
467 }
468 
469 /// Polls the future. If the future completes, the output is written to the
470 /// stage field.
poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()>471 fn poll_future<T: Future, S: Schedule>(core: &Core<T, S>, cx: Context<'_>) -> Poll<()> {
472     // Poll the future.
473     let output = panic::catch_unwind(panic::AssertUnwindSafe(|| {
474         struct Guard<'a, T: Future, S: Schedule> {
475             core: &'a Core<T, S>,
476         }
477         impl<'a, T: Future, S: Schedule> Drop for Guard<'a, T, S> {
478             fn drop(&mut self) {
479                 // If the future panics on poll, we drop it inside the panic
480                 // guard.
481                 self.core.drop_future_or_output();
482             }
483         }
484         let guard = Guard { core };
485         let res = guard.core.poll(cx);
486         mem::forget(guard);
487         res
488     }));
489 
490     // Prepare output for being placed in the core stage.
491     let output = match output {
492         Ok(Poll::Pending) => return Poll::Pending,
493         Ok(Poll::Ready(output)) => Ok(output),
494         Err(panic) => Err(panic_to_error(&core.scheduler, core.task_id, panic)),
495     };
496 
497     // Catch and ignore panics if the future panics on drop.
498     let res = panic::catch_unwind(panic::AssertUnwindSafe(|| {
499         core.store_output(output);
500     }));
501 
502     if res.is_err() {
503         core.scheduler.unhandled_panic();
504     }
505 
506     Poll::Ready(())
507 }
508 
509 #[cold]
panic_to_error<S: Schedule>( scheduler: &S, task_id: Id, panic: Box<dyn Any + Send + 'static>, ) -> JoinError510 fn panic_to_error<S: Schedule>(
511     scheduler: &S,
512     task_id: Id,
513     panic: Box<dyn Any + Send + 'static>,
514 ) -> JoinError {
515     scheduler.unhandled_panic();
516     JoinError::panic(task_id, panic)
517 }
518