• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Joinhandle for asynchronous tasks.
15 //!
16 //! [`JoinHandle`] is similar to a JoinHandle for a thread. It could be used to
17 //! await an asynchronous task to finish to get its result.
18 
19 use std::future::Future;
20 use std::marker::PhantomData;
21 use std::pin::Pin;
22 use std::task::{Context, Poll, Waker};
23 
24 use crate::error::ScheduleError;
25 use crate::task::raw::RawTask;
26 use crate::task::state;
27 
28 /// A handle to the actual spawned task.
29 ///
30 /// This can be considered as the equivalent of [`std::thread::JoinHandle`]
31 /// for a ylong task rather than a thread.
32 ///
33 /// It could be used to join the corresponding task or cancel it.
34 /// If a `JoinHandle` is dropped, then the task continues executing in the
35 /// background and its return value is lost. There is no way to join the task
36 /// after its JoinHandle is dropped.
37 ///
38 /// # Examples
39 ///
40 /// ```
41 /// let handle = ylong_runtime::spawn(async {
42 ///     let handle2 = ylong_runtime::spawn(async { 1 });
43 ///     assert_eq!(handle2.await.unwrap(), 1);
44 /// });
45 /// ylong_runtime::block_on(handle).unwrap();
46 /// ```
47 pub struct JoinHandle<R> {
48     pub(crate) raw: RawTask,
49     marker: PhantomData<R>,
50 }
51 
52 unsafe impl<R: Send> Send for JoinHandle<R> {}
53 unsafe impl<R: Send> Sync for JoinHandle<R> {}
54 
55 impl<R> JoinHandle<R> {
new(raw: RawTask) -> JoinHandle<R>56     pub(crate) fn new(raw: RawTask) -> JoinHandle<R> {
57         JoinHandle {
58             raw,
59             marker: PhantomData,
60         }
61     }
62 
63     /// Cancels the task associating with this JoinHandle. If the task has
64     /// already finished, this method does nothing.
65     ///
66     /// When successfully canceled, `.await` on this JoinHandle will return a
67     /// `TaskCanceled` error.
cancel(&self)68     pub fn cancel(&self) {
69         unsafe {
70             self.raw.cancel();
71         }
72     }
73 
get_cancel_handle(&self) -> CancelHandle74     pub(crate) fn get_cancel_handle(&self) -> CancelHandle {
75         CancelHandle::new(self.raw)
76     }
77 
set_waker(&mut self, waker: &Waker)78     pub(crate) fn set_waker(&mut self, waker: &Waker) {
79         let cur = self.raw.header().state.get_current_state();
80         unsafe {
81             if self.raw.set_waker(cur, waker as *const Waker as *const ()) {
82                 // Task already finished, wake the waker immediately
83                 waker.wake_by_ref();
84             }
85         }
86     }
87 }
88 
89 impl<R> Unpin for JoinHandle<R> {}
90 
91 impl<R> Future for JoinHandle<R> {
92     // The type of the output needs to match with Stage::StorageData
93     type Output = Result<R, ScheduleError>;
94 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>95     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
96         let mut res = Poll::Pending;
97 
98         let cur = self.raw.header().state.get_current_state();
99         if !state::is_care_join_handle(cur) {
100             panic!("JoinHandle should not be polled after it's dropped");
101         }
102         if state::is_finished(cur) {
103             unsafe {
104                 self.raw.get_result(&mut res as *mut _ as *mut ());
105             }
106         } else {
107             unsafe {
108                 let is_finished = self.raw.set_waker(cur, cx.waker() as *const _ as *mut ());
109                 // Setting the waker may happen concurrently with task finishing.
110                 // Therefore we check one more time to see if the task is finished.
111                 if is_finished {
112                     self.raw.get_result(&mut res as *mut _ as *mut ());
113                 }
114             }
115         }
116         res
117     }
118 }
119 
120 impl<R> Drop for JoinHandle<R> {
drop(&mut self)121     fn drop(&mut self) {
122         self.raw.drop_join_handle();
123     }
124 }
125 
126 /// A handle to cancel the spawned task.
127 ///
128 /// `CancelHandle` cannot await the task's completion, it can only terminate it.
129 pub struct CancelHandle {
130     raw: RawTask,
131 }
132 
133 impl CancelHandle {
new(raw: RawTask) -> CancelHandle134     pub(crate) fn new(raw: RawTask) -> CancelHandle {
135         raw.header().state.inc_ref();
136         CancelHandle { raw }
137     }
138 
139     /// Cancels the task associated with this handle.
140     ///
141     /// If the task has been already finished or it is currently running and
142     /// about to finish, then this method will do nothing.
cancel(&self)143     pub fn cancel(&self) {
144         unsafe { self.raw.cancel() }
145     }
146 
147     /// Checks whether the task associated with this handle has finished
148     /// executing.
is_finished(&self) -> bool149     pub fn is_finished(&self) -> bool {
150         let state = self.raw.header().state.get_current_state();
151         state::is_finished(state)
152     }
153 }
154 
155 impl Drop for CancelHandle {
drop(&mut self)156     fn drop(&mut self) {
157         self.raw.drop_ref()
158     }
159 }
160 
161 unsafe impl Send for CancelHandle {}
162 unsafe impl Sync for CancelHandle {}
163