1 // Copyright (C) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 mod keeper; 15 mod running_task; 16 use std::collections::{HashMap, HashSet}; 17 use std::sync::atomic::{AtomicBool, Ordering}; 18 use std::sync::Arc; 19 20 use keeper::SAKeeper; 21 22 cfg_oh! { 23 use crate::ability::SYSTEM_CONFIG_MANAGER; 24 } 25 use ylong_runtime::task::JoinHandle; 26 27 use crate::config::Mode; 28 use crate::error::ErrorCode; 29 use crate::manage::database::RequestDb; 30 use crate::manage::events::{TaskEvent, TaskManagerEvent}; 31 use crate::manage::scheduler::qos::{QosChanges, QosDirection}; 32 use crate::manage::scheduler::queue::running_task::RunningTask; 33 use crate::manage::task_manager::TaskManagerTx; 34 use crate::service::client::ClientManagerEntry; 35 use crate::service::run_count::RunCountManagerEntry; 36 use crate::task::config::Action; 37 use crate::task::info::State; 38 use crate::task::reason::Reason; 39 use crate::task::request_task::RequestTask; 40 use crate::utils::runtime_spawn; 41 42 pub(crate) struct RunningQueue { 43 download_queue: HashMap<(u64, u32), Arc<RequestTask>>, 44 upload_queue: HashMap<(u64, u32), Arc<RequestTask>>, 45 running_tasks: HashMap<(u64, u32), Option<AbortHandle>>, 46 keeper: SAKeeper, 47 tx: TaskManagerTx, 48 run_count_manager: RunCountManagerEntry, 49 client_manager: ClientManagerEntry, 50 // paused and then resume upload task need to upload from the breakpoint 51 pub(crate) upload_resume: HashSet<u32>, 52 } 53 54 impl RunningQueue { new( tx: TaskManagerTx, run_count_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, ) -> Self55 pub(crate) fn new( 56 tx: TaskManagerTx, 57 run_count_manager: RunCountManagerEntry, 58 client_manager: ClientManagerEntry, 59 ) -> Self { 60 Self { 61 download_queue: HashMap::new(), 62 upload_queue: HashMap::new(), 63 keeper: SAKeeper::new(tx.clone()), 64 tx, 65 running_tasks: HashMap::new(), 66 run_count_manager, 67 client_manager, 68 upload_resume: HashSet::new(), 69 } 70 } 71 get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>>72 pub(crate) fn get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>> { 73 self.download_queue 74 .get(&(uid, task_id)) 75 .or_else(|| self.upload_queue.get(&(uid, task_id))) 76 } 77 task_finish(&mut self, uid: u64, task_id: u32)78 pub(crate) fn task_finish(&mut self, uid: u64, task_id: u32) { 79 self.running_tasks.remove(&(uid, task_id)); 80 } 81 try_restart(&mut self, uid: u64, task_id: u32) -> bool82 pub(crate) fn try_restart(&mut self, uid: u64, task_id: u32) -> bool { 83 if let Some(task) = self 84 .download_queue 85 .get(&(uid, task_id)) 86 .or(self.upload_queue.get(&(uid, task_id))) 87 { 88 if self.running_tasks.contains_key(&(uid, task_id)) { 89 return false; 90 } 91 info!("{} restart running", task_id); 92 let running_task = RunningTask::new(task.clone(), self.tx.clone(), self.keeper.clone()); 93 let abort_flag = Arc::new(AtomicBool::new(false)); 94 let abort_flag_clone = abort_flag.clone(); 95 let join_handle = runtime_spawn(async move { 96 running_task.run(abort_flag_clone.clone()).await; 97 }); 98 let uid = task.uid(); 99 let task_id = task.task_id(); 100 self.running_tasks.insert( 101 (uid, task_id), 102 Some(AbortHandle::new(abort_flag, join_handle)), 103 ); 104 true 105 } else { 106 false 107 } 108 } 109 tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>>110 pub(crate) fn tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>> { 111 self.download_queue 112 .values() 113 .chain(self.upload_queue.values()) 114 } 115 running_tasks(&self) -> usize116 pub(crate) fn running_tasks(&self) -> usize { 117 self.download_queue.len() + self.upload_queue.len() 118 } 119 dump_tasks(&self)120 pub(crate) fn dump_tasks(&self) { 121 info!("dump all running {}", self.running_tasks()); 122 123 for ((uid, task_id), task) in self.download_queue.iter().chain(self.upload_queue.iter()) { 124 let task_status = task.status.lock().unwrap(); 125 info!( 126 "dump task {}, uid {}, action {}, mode {}, bundle {}, status {:?}", 127 task_id, 128 uid, 129 task.action().repr, 130 task.mode().repr, 131 task.bundle(), 132 *task_status 133 ); 134 } 135 } 136 reschedule(&mut self, qos: QosChanges, qos_remove_queue: &mut Vec<(u64, u32)>)137 pub(crate) fn reschedule(&mut self, qos: QosChanges, qos_remove_queue: &mut Vec<(u64, u32)>) { 138 if let Some(vec) = qos.download { 139 self.reschedule_inner(Action::Download, vec, qos_remove_queue) 140 } 141 if let Some(vec) = qos.upload { 142 self.reschedule_inner(Action::Upload, vec, qos_remove_queue) 143 } 144 } 145 reschedule_inner( &mut self, action: Action, qos_vec: Vec<QosDirection>, qos_remove_queue: &mut Vec<(u64, u32)>, )146 pub(crate) fn reschedule_inner( 147 &mut self, 148 action: Action, 149 qos_vec: Vec<QosDirection>, 150 qos_remove_queue: &mut Vec<(u64, u32)>, 151 ) { 152 let mut new_queue = HashMap::new(); 153 154 let queue = if action == Action::Download { 155 &mut self.download_queue 156 } else { 157 &mut self.upload_queue 158 }; 159 160 // We need to decide which tasks need to continue running based on `QosChanges`. 161 for qos_direction in qos_vec.iter() { 162 let uid = qos_direction.uid(); 163 let task_id = qos_direction.task_id(); 164 165 if let Some(task) = queue.remove(&(uid, task_id)) { 166 // If we can find that the task is running in `running_tasks`, 167 // we just need to adjust its rate. 168 task.speed_limit(qos_direction.direction() as u64); 169 // Then we put it into `satisfied_tasks`. 170 new_queue.insert((uid, task_id), task); 171 continue; 172 } 173 174 // If the task is not in the current running queue, retrieve 175 // the corresponding task from the database and start it. 176 177 #[cfg(feature = "oh")] 178 let system_config = unsafe { SYSTEM_CONFIG_MANAGER.assume_init_ref().system_config() }; 179 let upload_resume = self.upload_resume.remove(&task_id); 180 181 let task = match RequestDb::get_instance().get_task( 182 task_id, 183 #[cfg(feature = "oh")] 184 system_config, 185 &self.client_manager, 186 upload_resume, 187 ) { 188 Ok(task) => task, 189 Err(ErrorCode::TaskNotFound) => continue, // If we cannot find the task, skip it. 190 Err(ErrorCode::TaskStateErr) => continue, // If we cannot find the task, skip it. 191 Err(e) => { 192 info!("get task {} error:{:?}", task_id, e); 193 if let Some(info) = RequestDb::get_instance().get_task_qos_info(task_id) { 194 self.tx.send_event(TaskManagerEvent::Task(TaskEvent::Failed( 195 task_id, 196 uid, 197 Reason::OthersError, 198 Mode::from(info.mode), 199 ))); 200 } 201 qos_remove_queue.push((uid, task_id)); 202 continue; 203 } 204 }; 205 task.speed_limit(qos_direction.direction() as u64); 206 207 new_queue.insert((uid, task_id), task.clone()); 208 209 if self.running_tasks.contains_key(&(uid, task_id)) { 210 info!("task {} not finished", task_id); 211 continue; 212 } 213 214 info!("{} create running", task_id); 215 let running_task = RunningTask::new(task.clone(), self.tx.clone(), self.keeper.clone()); 216 RequestDb::get_instance().update_task_state( 217 running_task.task_id(), 218 State::Running, 219 Reason::Default, 220 ); 221 let abort_flag = Arc::new(AtomicBool::new(false)); 222 let abort_flag_clone = abort_flag.clone(); 223 let join_handle = runtime_spawn(async move { 224 running_task.run(abort_flag_clone).await; 225 }); 226 227 let uid = task.uid(); 228 let task_id = task.task_id(); 229 self.running_tasks.insert( 230 (uid, task_id), 231 Some(AbortHandle::new(abort_flag, join_handle)), 232 ); 233 } 234 // every satisfied tasks in running has been moved, set left tasks to Waiting 235 236 for task in queue.values() { 237 if let Some(join_handle) = self.running_tasks.get_mut(&(task.uid(), task.task_id())) { 238 if let Some(join_handle) = join_handle.take() { 239 join_handle.cancel(); 240 }; 241 } 242 } 243 *queue = new_queue; 244 245 #[cfg(feature = "oh")] 246 self.run_count_manager 247 .notify_run_count(self.download_queue.len() + self.upload_queue.len()); 248 } 249 retry_all_tasks(&mut self)250 pub(crate) fn retry_all_tasks(&mut self) { 251 for task in self.running_tasks.drain() { 252 if let Some(handle) = task.1 { 253 handle.cancel(); 254 } 255 } 256 } 257 } 258 259 struct AbortHandle { 260 abort_flag: Arc<AtomicBool>, 261 join_handle: JoinHandle<()>, 262 } 263 264 impl AbortHandle { new(abort_flag: Arc<AtomicBool>, join_handle: JoinHandle<()>) -> Self265 fn new(abort_flag: Arc<AtomicBool>, join_handle: JoinHandle<()>) -> Self { 266 Self { 267 abort_flag, 268 join_handle, 269 } 270 } cancel(self)271 fn cancel(self) { 272 self.abort_flag.store(true, Ordering::Release); 273 self.join_handle.cancel(); 274 } 275 } 276