1 // Copyright (c) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 //! Joinhandle for asynchronous tasks. 15 //! 16 //! [`JoinHandle`] is similar to a JoinHandle for a thread. It could be used to 17 //! await an asynchronous task to finish to get its result. 18 19 use std::future::Future; 20 use std::marker::PhantomData; 21 use std::pin::Pin; 22 use std::task::{Context, Poll, Waker}; 23 24 use crate::error::ScheduleError; 25 use crate::task::raw::RawTask; 26 use crate::task::state; 27 28 /// A handle to the actual spawned task. 29 /// 30 /// This can be considered as the equivalent of [`std::thread::JoinHandle`] 31 /// for a ylong task rather than a thread. 32 /// 33 /// It could be used to join the corresponding task or cancel it. 34 /// If a `JoinHandle` is dropped, then the task continues executing in the 35 /// background and its return value is lost. There is no way to join the task 36 /// after its JoinHandle is dropped. 37 /// 38 /// # Examples 39 /// 40 /// ``` 41 /// let handle = ylong_runtime::spawn(async { 42 /// let handle2 = ylong_runtime::spawn(async { 1 }); 43 /// assert_eq!(handle2.await.unwrap(), 1); 44 /// }); 45 /// ylong_runtime::block_on(handle).unwrap(); 46 /// ``` 47 pub struct JoinHandle<R> { 48 pub(crate) raw: RawTask, 49 marker: PhantomData<R>, 50 } 51 52 unsafe impl<R: Send> Send for JoinHandle<R> {} 53 unsafe impl<R: Send> Sync for JoinHandle<R> {} 54 55 impl<R> JoinHandle<R> { new(raw: RawTask) -> JoinHandle<R>56 pub(crate) fn new(raw: RawTask) -> JoinHandle<R> { 57 JoinHandle { 58 raw, 59 marker: PhantomData, 60 } 61 } 62 63 /// Cancels the task associating with this JoinHandle. If the task has 64 /// already finished, this method does nothing. 65 /// 66 /// When successfully canceled, `.await` on this JoinHandle will return a 67 /// `TaskCanceled` error. cancel(&self)68 pub fn cancel(&self) { 69 unsafe { 70 self.raw.cancel(); 71 } 72 } 73 get_cancel_handle(&self) -> CancelHandle74 pub(crate) fn get_cancel_handle(&self) -> CancelHandle { 75 CancelHandle::new(self.raw) 76 } 77 set_waker(&mut self, waker: &Waker)78 pub(crate) fn set_waker(&mut self, waker: &Waker) { 79 let cur = self.raw.header().state.get_current_state(); 80 unsafe { 81 if self.raw.set_waker(cur, waker as *const Waker as *const ()) { 82 // Task already finished, wake the waker immediately 83 waker.wake_by_ref(); 84 } 85 } 86 } 87 } 88 89 impl<R> Unpin for JoinHandle<R> {} 90 91 impl<R> Future for JoinHandle<R> { 92 // The type of the output needs to match with Stage::StorageData 93 type Output = Result<R, ScheduleError>; 94 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>95 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 96 let mut res = Poll::Pending; 97 98 let cur = self.raw.header().state.get_current_state(); 99 if !state::is_care_join_handle(cur) { 100 panic!("JoinHandle should not be polled after it's dropped"); 101 } 102 if state::is_finished(cur) { 103 unsafe { 104 self.raw.get_result(&mut res as *mut _ as *mut ()); 105 } 106 } else { 107 unsafe { 108 let is_finished = self.raw.set_waker(cur, cx.waker() as *const _ as *mut ()); 109 // Setting the waker may happen concurrently with task finishing. 110 // Therefore we check one more time to see if the task is finished. 111 if is_finished { 112 self.raw.get_result(&mut res as *mut _ as *mut ()); 113 } 114 } 115 } 116 res 117 } 118 } 119 120 impl<R> Drop for JoinHandle<R> { drop(&mut self)121 fn drop(&mut self) { 122 self.raw.drop_join_handle(); 123 } 124 } 125 126 /// A handle to cancel the spawned task. 127 /// 128 /// `CancelHandle` cannot await the task's completion, it can only terminate it. 129 pub struct CancelHandle { 130 raw: RawTask, 131 } 132 133 impl CancelHandle { new(raw: RawTask) -> CancelHandle134 pub(crate) fn new(raw: RawTask) -> CancelHandle { 135 raw.header().state.inc_ref(); 136 CancelHandle { raw } 137 } 138 139 /// Cancels the task associated with this handle. 140 /// 141 /// If the task has been already finished or it is currently running and 142 /// about to finish, then this method will do nothing. cancel(&self)143 pub fn cancel(&self) { 144 unsafe { self.raw.cancel() } 145 } 146 147 /// Checks whether the task associated with this handle has finished 148 /// executing. is_finished(&self) -> bool149 pub fn is_finished(&self) -> bool { 150 let state = self.raw.header().state.get_current_state(); 151 state::is_finished(state) 152 } 153 } 154 155 impl Drop for CancelHandle { drop(&mut self)156 fn drop(&mut self) { 157 self.raw.drop_ref() 158 } 159 } 160 161 unsafe impl Send for CancelHandle {} 162 unsafe impl Sync for CancelHandle {} 163