1 // Copyright (C) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 mod qos; 15 mod queue; 16 pub(crate) mod state; 17 use std::collections::HashMap; 18 use std::sync::Arc; 19 mod sql; 20 use qos::Qos; 21 use queue::RunningQueue; 22 use state::sql::SqlList; 23 24 use super::events::TaskManagerEvent; 25 use crate::config::Mode; 26 use crate::error::ErrorCode; 27 use crate::info::TaskInfo; 28 use crate::manage::database::RequestDb; 29 use crate::manage::notifier::Notifier; 30 use crate::manage::task_manager::TaskManagerTx; 31 use crate::service::client::ClientManagerEntry; 32 use crate::service::notification_bar::NotificationDispatcher; 33 use crate::service::run_count::RunCountManagerEntry; 34 use crate::task::config::Action; 35 use crate::task::info::State; 36 use crate::task::reason::Reason; 37 use crate::task::request_task::RequestTask; 38 use crate::utils::get_current_timestamp; 39 40 const MILLISECONDS_IN_ONE_MONTH: u64 = 30 * 24 * 60 * 60 * 1000; 41 42 // Scheduler 的基本处理逻辑如下: 43 // 1. Scheduler 维护一个当前所有 运行中 和 44 // 待运行的任务优先级队列(scheduler.qos), 45 // 该队列仅保存任务的优先级信息和基础信息,当环境发生变化时, 46 // 将该优先级队列重新排序,并得到一系列优先级调节指令(QosChange), 47 // 这些指令的作用是指引运行队列将满足优先级排序的任务变为运行状态。 48 // 49 // 2. 得到指令后,将该指令作用于任务队列(scheduler.queue)。 50 // 任务队列保存当前正在运行的任务列表(scheduler.queue.running), 51 // 所以运行队列根据指令的内容, 将指令引导的那些任务置于运行任务列表, 52 // 并调节速率。对于那些当前正在执行,但此时又未得到运行权限的任务, 53 // 我们将其修改为Waiting状态,运行任务队列就更新完成了。 54 // 55 // 注意:未处于运行状态中的任务不会停留在内存中。 56 57 pub(crate) struct Scheduler { 58 qos: Qos, 59 running_queue: RunningQueue, 60 client_manager: ClientManagerEntry, 61 state_handler: state::Handler, 62 pub(crate) resort_scheduled: bool, 63 task_manager: TaskManagerTx, 64 } 65 66 impl Scheduler { init( tx: TaskManagerTx, runcount_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, ) -> Scheduler67 pub(crate) fn init( 68 tx: TaskManagerTx, 69 runcount_manager: RunCountManagerEntry, 70 client_manager: ClientManagerEntry, 71 ) -> Scheduler { 72 let mut state_handler = state::Handler::new(tx.clone()); 73 let sql_list = state_handler.init(); 74 let db = RequestDb::get_instance(); 75 for sql in sql_list { 76 if let Err(e) = db.execute(&sql) { 77 error!("TaskManager update network failed {:?}", e); 78 }; 79 } 80 81 Self { 82 qos: Qos::new(), 83 running_queue: RunningQueue::new(tx.clone(), runcount_manager, client_manager.clone()), 84 client_manager, 85 state_handler, 86 resort_scheduled: false, 87 task_manager: tx, 88 } 89 } 90 get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>>91 pub(crate) fn get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>> { 92 self.running_queue.get_task(uid, task_id) 93 } 94 tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>>95 pub(crate) fn tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>> { 96 self.running_queue.tasks() 97 } 98 running_tasks(&self) -> usize99 pub(crate) fn running_tasks(&self) -> usize { 100 self.running_queue.running_tasks() 101 } 102 dump_tasks(&self)103 pub(crate) fn dump_tasks(&self) { 104 self.running_queue.dump_tasks(); 105 } 106 restore_all_tasks(&mut self)107 pub(crate) fn restore_all_tasks(&mut self) { 108 info!("reschedule restore all tasks"); 109 // Reschedule tasks based on the current `QOS` status. 110 self.schedule_if_not_scheduled(); 111 } 112 start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>113 pub(crate) fn start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 114 self.start_inner(uid, task_id, false) 115 } 116 resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>117 pub(crate) fn resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 118 self.start_inner(uid, task_id, true) 119 } 120 start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode>121 fn start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode> { 122 let database = RequestDb::get_instance(); 123 let info = RequestDb::get_instance() 124 .get_task_info(task_id) 125 .ok_or(ErrorCode::TaskNotFound)?; 126 127 if (is_resume && info.progress.common_data.state != State::Paused.repr) 128 || (!is_resume && info.progress.common_data.state == State::Paused.repr) 129 { 130 return Err(ErrorCode::TaskStateErr); 131 } 132 // Change `Waiting` so that it can be scheduled. 133 database.change_status(task_id, State::Waiting)?; 134 135 let info = RequestDb::get_instance() 136 .get_task_info(task_id) 137 .ok_or(ErrorCode::TaskNotFound)?; 138 if is_resume { 139 Notifier::resume(&self.client_manager, info.build_notify_data()); 140 } 141 142 if info.progress.is_finish() { 143 database.update_task_state(task_id, State::Completed, Reason::Default); 144 if let Some(info) = database.get_task_info(task_id) { 145 Notifier::complete(&self.client_manager, info.build_notify_data()); 146 } 147 } 148 149 if !self.check_config_satisfy(task_id)? { 150 return Ok(()); 151 }; 152 let qos_info = database 153 .get_task_qos_info(task_id) 154 .ok_or(ErrorCode::TaskNotFound)?; 155 self.qos.start_task(uid, qos_info); 156 self.schedule_if_not_scheduled(); 157 Ok(()) 158 } 159 pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>160 pub(crate) fn pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 161 let database = RequestDb::get_instance(); 162 database.change_status(task_id, State::Paused)?; 163 164 if let Some(info) = database.get_task_info(task_id) { 165 Notifier::pause(&self.client_manager, info.build_notify_data()); 166 } 167 self.running_queue.upload_resume.insert(task_id); 168 if self.qos.remove_task(uid, task_id) { 169 self.schedule_if_not_scheduled(); 170 } 171 Ok(()) 172 } 173 remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>174 pub(crate) fn remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 175 let database = RequestDb::get_instance(); 176 database.change_status(task_id, State::Removed)?; 177 let info = database 178 .get_task_info(task_id) 179 .ok_or(ErrorCode::TaskNotFound)?; 180 181 Notifier::remove(&self.client_manager, info.build_notify_data()); 182 183 if self.qos.remove_task(uid, task_id) { 184 self.schedule_if_not_scheduled(); 185 } 186 Ok(()) 187 } 188 stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>189 pub(crate) fn stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 190 let database = RequestDb::get_instance(); 191 database.change_status(task_id, State::Stopped)?; 192 193 if self.qos.remove_task(uid, task_id) { 194 self.schedule_if_not_scheduled(); 195 } 196 Ok(()) 197 } 198 task_completed(&mut self, uid: u64, task_id: u32)199 pub(crate) fn task_completed(&mut self, uid: u64, task_id: u32) { 200 info!("scheduler task {} completed", task_id); 201 self.running_queue.task_finish(uid, task_id); 202 203 let database = RequestDb::get_instance(); 204 if self.qos.remove_task(uid, task_id) { 205 self.schedule_if_not_scheduled(); 206 } 207 208 if let Some(info) = database.get_task_qos_info(task_id) { 209 if info.state == State::Failed.repr { 210 if let Some(task_info) = database.get_task_info(task_id) { 211 Scheduler::notify_fail(task_info, &self.client_manager); 212 return; 213 } 214 } 215 216 if info.state != State::Running.repr && info.state != State::Waiting.repr { 217 return; 218 } 219 } 220 221 database.update_task_state(task_id, State::Completed, Reason::Default); 222 if let Some(info) = database.get_task_info(task_id) { 223 Notifier::complete(&self.client_manager, info.build_notify_data()); 224 NotificationDispatcher::get_instance().publish_success_notification(&info); 225 } 226 } 227 task_cancel( &mut self, uid: u64, task_id: u32, mode: Mode, task_count: &mut HashMap<u64, (usize, usize)>, )228 pub(crate) fn task_cancel( 229 &mut self, 230 uid: u64, 231 task_id: u32, 232 mode: Mode, 233 task_count: &mut HashMap<u64, (usize, usize)>, 234 ) { 235 info!("scheduler task {} canceled", task_id); 236 self.running_queue.task_finish(uid, task_id); 237 if self.running_queue.try_restart(uid, task_id) { 238 return; 239 } 240 let database = RequestDb::get_instance(); 241 let Some(info) = database.get_task_info(task_id) else { 242 error!("task {} not found in database", task_id); 243 NotificationDispatcher::get_instance().unregister_task(uid, task_id); 244 return; 245 }; 246 match State::from(info.progress.common_data.state) { 247 State::Running | State::Retrying => { 248 info!("task {} waiting for task limits", task_id); 249 RequestDb::get_instance().update_task_state( 250 task_id, 251 State::Waiting, 252 Reason::RunningTaskMeetLimits, 253 ); 254 } 255 State::Failed => { 256 info!("task {} cancel with state Failed", task_id); 257 Scheduler::reduce_task_count(uid, mode, task_count); 258 Scheduler::notify_fail(info, &self.client_manager); 259 } 260 State::Stopped | State::Removed => { 261 info!("task {} cancel with state Stopped or Removed", task_id); 262 NotificationDispatcher::get_instance().unregister_task(uid, task_id); 263 } 264 state => { 265 info!( 266 "task {} cancel state {:?} reason {:?}", 267 task_id, 268 state, 269 Reason::from(info.common_data.reason) 270 ); 271 } 272 } 273 } 274 task_failed(&mut self, uid: u64, task_id: u32, reason: Reason)275 pub(crate) fn task_failed(&mut self, uid: u64, task_id: u32, reason: Reason) { 276 info!("scheduler task {} failed", task_id); 277 self.running_queue.task_finish(uid, task_id); 278 279 let database = RequestDb::get_instance(); 280 281 if self.qos.remove_task(uid, task_id) { 282 self.schedule_if_not_scheduled(); 283 } 284 285 if let Some(info) = database.get_task_qos_info(task_id) { 286 if info.state != State::Running.repr && info.state != State::Waiting.repr { 287 return; 288 } 289 } 290 291 database.update_task_state(task_id, State::Failed, reason); 292 if let Some(info) = database.get_task_info(task_id) { 293 Scheduler::notify_fail(info, &self.client_manager); 294 } 295 } 296 notify_fail(info: TaskInfo, client_manager: &ClientManagerEntry)297 fn notify_fail(info: TaskInfo, client_manager: &ClientManagerEntry) { 298 Notifier::fail(client_manager, info.build_notify_data()); 299 NotificationDispatcher::get_instance().publish_failed_notification(&info); 300 #[cfg(feature = "oh")] 301 Self::sys_event(info); 302 } 303 reduce_task_count( uid: u64, mode: Mode, task_count: &mut HashMap<u64, (usize, usize)>, )304 pub(crate) fn reduce_task_count( 305 uid: u64, 306 mode: Mode, 307 task_count: &mut HashMap<u64, (usize, usize)>, 308 ) { 309 if let Some((front, back)) = task_count.get_mut(&uid) { 310 match mode { 311 Mode::FrontEnd => { 312 if *front > 0 { 313 *front -= 1; 314 } 315 } 316 _ => { 317 if *back > 0 { 318 *back -= 1; 319 } 320 } 321 } 322 } 323 } 324 325 #[cfg(feature = "oh")] sys_event(info: TaskInfo)326 pub(crate) fn sys_event(info: TaskInfo) { 327 use hisysevent::{build_number_param, build_str_param}; 328 329 use crate::sys_event::SysEvent; 330 331 let index = info.progress.common_data.index; 332 let size = info.file_specs.len(); 333 let action = match info.action() { 334 Action::Download => "DOWNLOAD", 335 Action::Upload => "UPLOAD", 336 _ => "UNKNOWN", 337 }; 338 let reason = Reason::from(info.common_data.reason); 339 340 SysEvent::task_fault() 341 .param(build_str_param!(crate::sys_event::TASKS_TYPE, action)) 342 .param(build_number_param!( 343 crate::sys_event::TOTAL_FILE_NUM, 344 size as i32 345 )) 346 .param(build_number_param!( 347 crate::sys_event::FAIL_FILE_NUM, 348 (size - index) as i32 349 )) 350 .param(build_number_param!( 351 crate::sys_event::SUCCESS_FILE_NUM, 352 index as i32 353 )) 354 .param(build_number_param!( 355 crate::sys_event::ERROR_INFO, 356 reason.repr as i32 357 )) 358 .write(); 359 } 360 on_state_change<T, F>(&mut self, f: F, t: T) where F: FnOnce(&mut state::Handler, T) -> Option<SqlList>,361 pub(crate) fn on_state_change<T, F>(&mut self, f: F, t: T) 362 where 363 F: FnOnce(&mut state::Handler, T) -> Option<SqlList>, 364 { 365 let Some(sql_list) = f(&mut self.state_handler, t) else { 366 return; 367 }; 368 let db = RequestDb::get_instance(); 369 for sql in sql_list { 370 if let Err(e) = db.execute(&sql) { 371 error!("TaskManager update network failed {:?}", e); 372 }; 373 } 374 self.reload_all_tasks(); 375 } 376 reload_all_tasks(&mut self)377 pub(crate) fn reload_all_tasks(&mut self) { 378 self.qos.reload_all_tasks(); 379 self.schedule_if_not_scheduled(); 380 } 381 on_rss_change(&mut self, level: i32)382 pub(crate) fn on_rss_change(&mut self, level: i32) { 383 if let Some(new_rss) = self.state_handler.update_rss_level(level) { 384 self.qos.change_rss(new_rss); 385 self.schedule_if_not_scheduled(); 386 } 387 } 388 schedule_if_not_scheduled(&mut self)389 fn schedule_if_not_scheduled(&mut self) { 390 if self.resort_scheduled { 391 return; 392 } 393 self.resort_scheduled = true; 394 let task_manager = self.task_manager.clone(); 395 task_manager.send_event(TaskManagerEvent::Reschedule); 396 } 397 reschedule(&mut self)398 pub(crate) fn reschedule(&mut self) { 399 self.resort_scheduled = false; 400 let changes = self.qos.reschedule(&self.state_handler); 401 let mut qos_remove_queue = vec![]; 402 self.running_queue 403 .reschedule(changes, &mut qos_remove_queue); 404 for (uid, task_id) in qos_remove_queue.iter() { 405 self.qos.apps.remove_task(*uid, *task_id); 406 } 407 if !qos_remove_queue.is_empty() { 408 self.reload_all_tasks(); 409 } 410 } 411 check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode>412 pub(crate) fn check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode> { 413 let database = RequestDb::get_instance(); 414 let config = database 415 .get_task_config(task_id) 416 .ok_or(ErrorCode::TaskNotFound)?; 417 418 if let Err(reason) = config.satisfy_network(self.state_handler.network()) { 419 info!( 420 "task {} started, waiting for network {:?}", 421 task_id, 422 self.state_handler.network() 423 ); 424 425 database.update_task_state(task_id, State::Waiting, reason); 426 return Ok(false); 427 } 428 429 if !config.satisfy_foreground(self.state_handler.foreground_abilities()) { 430 info!( 431 "task {} started, waiting for app {}", 432 task_id, config.common_data.uid 433 ); 434 database.update_task_state(task_id, State::Waiting, Reason::AppBackgroundOrTerminate); 435 return Ok(false); 436 } 437 Ok(true) 438 } 439 clear_timeout_tasks(&mut self)440 pub(crate) fn clear_timeout_tasks(&mut self) { 441 let current_time = get_current_timestamp(); 442 let timeout_tasks = self 443 .tasks() 444 .filter(|task| current_time - task.ctime > MILLISECONDS_IN_ONE_MONTH) 445 .cloned() 446 .collect::<Vec<_>>(); 447 if timeout_tasks.is_empty() { 448 return; 449 } 450 let database = RequestDb::get_instance(); 451 for task in timeout_tasks { 452 if database 453 .change_status(task.task_id(), State::Stopped) 454 .is_ok() 455 { 456 self.qos.apps.remove_task(task.uid(), task.task_id()); 457 } 458 } 459 self.schedule_if_not_scheduled(); 460 } 461 retry_all_tasks(&mut self)462 pub(crate) fn retry_all_tasks(&mut self) { 463 self.running_queue.retry_all_tasks(); 464 } 465 } 466 467 impl RequestDb { change_status(&self, task_id: u32, new_state: State) -> Result<(), ErrorCode>468 fn change_status(&self, task_id: u32, new_state: State) -> Result<(), ErrorCode> { 469 let info = RequestDb::get_instance() 470 .get_task_info(task_id) 471 .ok_or(ErrorCode::TaskNotFound)?; 472 473 let old_state = info.progress.common_data.state; 474 if old_state == new_state.repr { 475 if new_state == State::Removed { 476 return Err(ErrorCode::TaskNotFound); 477 } else { 478 return Err(ErrorCode::TaskStateErr); 479 } 480 } 481 let sql = match new_state { 482 State::Paused => sql::pause_task(task_id), 483 State::Running => sql::start_task(task_id), 484 State::Stopped => sql::stop_task(task_id), 485 State::Removed => sql::remove_task(task_id), 486 State::Waiting => sql::start_task(task_id), 487 _ => return Err(ErrorCode::Other), 488 }; 489 490 RequestDb::get_instance() 491 .execute(&sql) 492 .map_err(|_| ErrorCode::SystemApi)?; 493 494 let info = RequestDb::get_instance() 495 .get_task_info(task_id) 496 .ok_or(ErrorCode::SystemApi)?; 497 if info.progress.common_data.state != new_state.repr { 498 return Err(ErrorCode::TaskStateErr); 499 } 500 501 if (old_state == State::Initialized.repr 502 || old_state == State::Waiting.repr 503 || old_state == State::Paused.repr) 504 && (new_state == State::Stopped || new_state == State::Removed) 505 { 506 NotificationDispatcher::get_instance().unregister_task(info.uid(), task_id); 507 } 508 Ok(()) 509 } 510 } 511