1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Joinhandle for asynchronous tasks. 15 //! 16 //! [`JoinHandle`] is similar to a JoinHandle for a thread. It could be used to 17 //! await an asynchronous task to finish to get its result. 18 19 use std::future::Future; 20 use std::marker::PhantomData; 21 use std::pin::Pin; 22 use std::task::{Context, Poll, Waker}; 23 24 use crate::error::ScheduleError; 25 use crate::task::raw::RawTask; 26 use crate::task::state; 27 28 /// A handle to the actual spawned task. 29 /// 30 /// This can be considered as the equivalent of [`std::thread::JoinHandle`] 31 /// for a ylong task rather than a thread. 32 /// 33 /// It could be used to join the corresponding task or cancel it. 34 /// If a `JoinHandle` is dropped, then the task continues executing in the 35 /// background and its return value is lost. There is no way to join the task 36 /// after its JoinHandle is dropped. 37 /// 38 /// # Examples 39 /// 40 /// ``` 41 /// let handle = ylong_runtime::spawn(async { 42 /// let handle2 = ylong_runtime::spawn(async { 1 }); 43 /// assert_eq!(handle2.await.unwrap(), 1); 44 /// }); 45 /// ylong_runtime::block_on(handle).unwrap(); 46 /// ``` 47 pub struct JoinHandle<R> { 48 pub(crate) raw: RawTask, 49 marker: PhantomData<R>, 50 } 51 52 unsafe impl<R: Send> Send for JoinHandle<R> {} 53 unsafe impl<R: Send> Sync for JoinHandle<R> {} 54 55 impl<R> JoinHandle<R> { new(raw: RawTask) -> JoinHandle<R>56 pub(crate) fn new(raw: RawTask) -> JoinHandle<R> { 57 JoinHandle { 58 raw, 59 marker: PhantomData, 60 } 61 } 62 63 /// Cancels the task associating with this JoinHandle. If the task has 64 /// already finished, this method does nothing. 65 /// 66 /// When successfully canceled, `.await` on this JoinHandle will return a 67 /// `TaskCanceled` error. cancel(&self)68 pub fn cancel(&self) { 69 unsafe { 70 self.raw.cancel(); 71 } 72 } 73 /// Returns true if the task associated with this `JoinHandle` has finished. is_finished(&self) -> bool74 pub fn is_finished(&self) -> bool { 75 state::is_finished(self.raw.header().state.get_current_state()) 76 } 77 get_cancel_handle(&self) -> CancelHandle78 pub(crate) fn get_cancel_handle(&self) -> CancelHandle { 79 CancelHandle::new(self.raw) 80 } 81 set_waker(&mut self, waker: &Waker)82 pub(crate) fn set_waker(&mut self, waker: &Waker) { 83 let cur = self.raw.header().state.get_current_state(); 84 unsafe { 85 if self 86 .raw 87 .set_waker(cur, (waker as *const Waker).cast::<()>()) 88 { 89 // Task already finished, wake the waker immediately 90 waker.wake_by_ref(); 91 } 92 } 93 } 94 } 95 96 impl<R> Unpin for JoinHandle<R> {} 97 98 impl<R> Future for JoinHandle<R> { 99 // The type of the output needs to match with Stage::StorageData 100 type Output = Result<R, ScheduleError>; 101 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>102 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 103 let mut res = Poll::Pending; 104 105 let cur = self.raw.header().state.get_current_state(); 106 assert!( 107 state::is_care_join_handle(cur), 108 "JoinHandle should not be polled after it's dropped" 109 ); 110 111 if state::is_finished(cur) { 112 unsafe { 113 self.raw 114 .get_result((&mut res as *mut Poll<Result<R, ScheduleError>>).cast::<()>()); 115 } 116 } else { 117 unsafe { 118 let is_finished = self.raw.set_waker(cur, cx.waker() as *const _ as *mut ()); 119 // Setting the waker may happen concurrently with task finishing. 120 // Therefore we check one more time to see if the task is finished. 121 if is_finished { 122 self.raw 123 .get_result((&mut res as *mut Poll<Result<R, ScheduleError>>).cast::<()>()); 124 } 125 } 126 } 127 res 128 } 129 } 130 131 impl<R> Drop for JoinHandle<R> { drop(&mut self)132 fn drop(&mut self) { 133 self.raw.drop_join_handle(); 134 } 135 } 136 137 /// A handle to cancel the spawned task. 138 /// 139 /// `CancelHandle` cannot await the task's completion, it can only terminate it. 140 pub struct CancelHandle { 141 raw: RawTask, 142 } 143 144 impl CancelHandle { new(raw: RawTask) -> CancelHandle145 pub(crate) fn new(raw: RawTask) -> CancelHandle { 146 raw.header().state.inc_ref(); 147 CancelHandle { raw } 148 } 149 150 /// Cancels the task associated with this handle. 151 /// 152 /// If the task has been already finished or it is currently running and 153 /// about to finish, then this method will do nothing. cancel(&self)154 pub fn cancel(&self) { 155 unsafe { self.raw.cancel() } 156 } 157 158 /// Checks whether the task associated with this handle has finished 159 /// executing. is_finished(&self) -> bool160 pub fn is_finished(&self) -> bool { 161 let state = self.raw.header().state.get_current_state(); 162 state::is_finished(state) 163 } 164 } 165 166 impl Drop for CancelHandle { drop(&mut self)167 fn drop(&mut self) { 168 self.raw.drop_ref() 169 } 170 } 171 172 unsafe impl Send for CancelHandle {} 173 unsafe impl Sync for CancelHandle {} 174 175 #[cfg(all(test, feature = "time"))] 176 mod test { 177 use std::time::Duration; 178 179 /// UT test cases for `is_finished` in `JoinHandle`. 180 /// 181 /// # Brief 182 /// 1. create two JoinHandle 183 /// 2. check the correctness of the JoinHandle for completion 184 #[test] ut_test_join_handle_is_finished()185 fn ut_test_join_handle_is_finished() { 186 let handle1 = crate::spawn(async { 1 }); 187 188 let handle2 = crate::spawn(async { 189 loop { 190 crate::time::sleep(Duration::from_millis(10)).await; 191 } 192 }); 193 while !handle1.is_finished() { 194 std::thread::sleep(Duration::from_millis(10)); 195 } 196 assert!(handle1.is_finished()); 197 assert!(!handle2.is_finished()); 198 handle2.cancel(); 199 } 200 201 /// UT test cases for `CancelHandle`. 202 /// 203 /// # Brief 204 /// 1. create a CancelHandle with JoinHandle 205 /// 2. check the correctness of the JoinHandle for completion 206 #[test] ut_test_cancel_handle()207 fn ut_test_cancel_handle() { 208 let handle = crate::spawn(async { 1 }); 209 let cancel = handle.get_cancel_handle(); 210 while !cancel.is_finished() { 211 std::hint::spin_loop(); 212 } 213 handle.cancel(); 214 assert!(handle.is_finished()); 215 } 216 } 217