• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (c) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 //! Joinhandle for asynchronous tasks.
15 //!
16 //! [`JoinHandle`] is similar to a JoinHandle for a thread. It could be used to
17 //! await an asynchronous task to finish to get its result.
18 
19 use std::future::Future;
20 use std::marker::PhantomData;
21 use std::pin::Pin;
22 use std::task::{Context, Poll, Waker};
23 
24 use crate::error::ScheduleError;
25 use crate::task::raw::RawTask;
26 use crate::task::state;
27 
28 /// A handle to the actual spawned task.
29 ///
30 /// This can be considered as the equivalent of [`std::thread::JoinHandle`]
31 /// for a ylong task rather than a thread.
32 ///
33 /// It could be used to join the corresponding task or cancel it.
34 /// If a `JoinHandle` is dropped, then the task continues executing in the
35 /// background and its return value is lost. There is no way to join the task
36 /// after its JoinHandle is dropped.
37 ///
38 /// # Examples
39 ///
40 /// ```
41 /// let handle = ylong_runtime::spawn(async {
42 ///     let handle2 = ylong_runtime::spawn(async { 1 });
43 ///     assert_eq!(handle2.await.unwrap(), 1);
44 /// });
45 /// ylong_runtime::block_on(handle).unwrap();
46 /// ```
47 pub struct JoinHandle<R> {
48     pub(crate) raw: RawTask,
49     marker: PhantomData<R>,
50 }
51 
52 unsafe impl<R: Send> Send for JoinHandle<R> {}
53 unsafe impl<R: Send> Sync for JoinHandle<R> {}
54 
55 impl<R> JoinHandle<R> {
new(raw: RawTask) -> JoinHandle<R>56     pub(crate) fn new(raw: RawTask) -> JoinHandle<R> {
57         JoinHandle {
58             raw,
59             marker: PhantomData,
60         }
61     }
62 
63     /// Cancels the task associating with this JoinHandle. If the task has
64     /// already finished, this method does nothing.
65     ///
66     /// When successfully canceled, `.await` on this JoinHandle will return a
67     /// `TaskCanceled` error.
cancel(&self)68     pub fn cancel(&self) {
69         unsafe {
70             self.raw.cancel();
71         }
72     }
73     /// Returns true if the task associated with this `JoinHandle` has finished.
is_finished(&self) -> bool74     pub fn is_finished(&self) -> bool {
75         state::is_finished(self.raw.header().state.get_current_state())
76     }
77 
get_cancel_handle(&self) -> CancelHandle78     pub(crate) fn get_cancel_handle(&self) -> CancelHandle {
79         CancelHandle::new(self.raw)
80     }
81 
set_waker(&mut self, waker: &Waker)82     pub(crate) fn set_waker(&mut self, waker: &Waker) {
83         let cur = self.raw.header().state.get_current_state();
84         unsafe {
85             if self
86                 .raw
87                 .set_waker(cur, (waker as *const Waker).cast::<()>())
88             {
89                 // Task already finished, wake the waker immediately
90                 waker.wake_by_ref();
91             }
92         }
93     }
94 }
95 
96 impl<R> Unpin for JoinHandle<R> {}
97 
98 impl<R> Future for JoinHandle<R> {
99     // The type of the output needs to match with Stage::StorageData
100     type Output = Result<R, ScheduleError>;
101 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>102     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
103         let mut res = Poll::Pending;
104 
105         let cur = self.raw.header().state.get_current_state();
106         assert!(
107             state::is_care_join_handle(cur),
108             "JoinHandle should not be polled after it's dropped"
109         );
110 
111         if state::is_finished(cur) {
112             unsafe {
113                 self.raw
114                     .get_result((&mut res as *mut Poll<Result<R, ScheduleError>>).cast::<()>());
115             }
116         } else {
117             unsafe {
118                 let is_finished = self.raw.set_waker(cur, cx.waker() as *const _ as *mut ());
119                 // Setting the waker may happen concurrently with task finishing.
120                 // Therefore we check one more time to see if the task is finished.
121                 if is_finished {
122                     self.raw
123                         .get_result((&mut res as *mut Poll<Result<R, ScheduleError>>).cast::<()>());
124                 }
125             }
126         }
127         res
128     }
129 }
130 
131 impl<R> Drop for JoinHandle<R> {
drop(&mut self)132     fn drop(&mut self) {
133         self.raw.drop_join_handle();
134     }
135 }
136 
137 /// A handle to cancel the spawned task.
138 ///
139 /// `CancelHandle` cannot await the task's completion, it can only terminate it.
140 pub struct CancelHandle {
141     raw: RawTask,
142 }
143 
144 impl CancelHandle {
new(raw: RawTask) -> CancelHandle145     pub(crate) fn new(raw: RawTask) -> CancelHandle {
146         raw.header().state.inc_ref();
147         CancelHandle { raw }
148     }
149 
150     /// Cancels the task associated with this handle.
151     ///
152     /// If the task has been already finished or it is currently running and
153     /// about to finish, then this method will do nothing.
cancel(&self)154     pub fn cancel(&self) {
155         unsafe { self.raw.cancel() }
156     }
157 
158     /// Checks whether the task associated with this handle has finished
159     /// executing.
is_finished(&self) -> bool160     pub fn is_finished(&self) -> bool {
161         let state = self.raw.header().state.get_current_state();
162         state::is_finished(state)
163     }
164 }
165 
166 impl Drop for CancelHandle {
drop(&mut self)167     fn drop(&mut self) {
168         self.raw.drop_ref()
169     }
170 }
171 
172 unsafe impl Send for CancelHandle {}
173 unsafe impl Sync for CancelHandle {}
174 
175 #[cfg(all(test, feature = "time"))]
176 mod test {
177     use std::time::Duration;
178 
179     /// UT test cases for `is_finished` in `JoinHandle`.
180     ///
181     /// # Brief
182     /// 1. create two JoinHandle
183     /// 2. check the correctness of the JoinHandle for completion
184     #[test]
ut_test_join_handle_is_finished()185     fn ut_test_join_handle_is_finished() {
186         let handle1 = crate::spawn(async { 1 });
187 
188         let handle2 = crate::spawn(async {
189             loop {
190                 crate::time::sleep(Duration::from_millis(10)).await;
191             }
192         });
193         while !handle1.is_finished() {
194             std::thread::sleep(Duration::from_millis(10));
195         }
196         assert!(handle1.is_finished());
197         assert!(!handle2.is_finished());
198         handle2.cancel();
199     }
200 
201     /// UT test cases for `CancelHandle`.
202     ///
203     /// # Brief
204     /// 1. create a CancelHandle with JoinHandle
205     /// 2. check the correctness of the JoinHandle for completion
206     #[test]
ut_test_cancel_handle()207     fn ut_test_cancel_handle() {
208         let handle = crate::spawn(async { 1 });
209         let cancel = handle.get_cancel_handle();
210         while !cancel.is_finished() {
211             std::hint::spin_loop();
212         }
213         handle.cancel();
214         assert!(handle.is_finished());
215     }
216 }
217