1 // Copyright (C) 2023 Huawei Device Co., Ltd. 2 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // you may not use this file except in compliance with the License. 4 // You may obtain a copy of the License at 5 // 6 // http://www.apache.org/licenses/LICENSE-2.0 7 // 8 // Unless required by applicable law or agreed to in writing, software 9 // distributed under the License is distributed on an "AS IS" BASIS, 10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // See the License for the specific language governing permissions and 12 // limitations under the License. 13 14 mod qos; 15 mod queue; 16 pub(crate) mod state; 17 use std::collections::HashMap; 18 use std::sync::atomic::Ordering; 19 use std::sync::Arc; 20 21 mod sql; 22 use qos::Qos; 23 use queue::RunningQueue; 24 use state::sql::SqlList; 25 26 use super::events::TaskManagerEvent; 27 use crate::config::Mode; 28 use crate::error::ErrorCode; 29 use crate::info::TaskInfo; 30 use crate::manage::database::RequestDb; 31 use crate::manage::notifier::Notifier; 32 use crate::manage::task_manager::TaskManagerTx; 33 use crate::service::active_counter::ActiveCounter; 34 use crate::service::client::ClientManagerEntry; 35 use crate::service::notification_bar::NotificationDispatcher; 36 use crate::service::run_count::RunCountManagerEntry; 37 use crate::task::config::Action; 38 use crate::task::info::State; 39 use crate::task::notify::WaitingCause; 40 use crate::task::reason::Reason; 41 use crate::task::request_task::RequestTask; 42 use crate::utils::get_current_timestamp; 43 44 const MILLISECONDS_IN_ONE_MONTH: u64 = 30 * 24 * 60 * 60 * 1000; 45 46 // Scheduler 的基本处理逻辑如下: 47 // 1. Scheduler 维护一个当前所有 运行中 和 48 // 待运行的任务优先级队列(scheduler.qos), 49 // 该队列仅保存任务的优先级信息和基础信息,当环境发生变化时, 50 // 将该优先级队列重新排序,并得到一系列优先级调节指令(QosChange), 51 // 这些指令的作用是指引运行队列将满足优先级排序的任务变为运行状态。 52 // 53 // 2. 得到指令后,将该指令作用于任务队列(scheduler.queue)。 54 // 任务队列保存当前正在运行的任务列表(scheduler.queue.running), 55 // 所以运行队列根据指令的内容, 将指令引导的那些任务置于运行任务列表, 56 // 并调节速率。对于那些当前正在执行,但此时又未得到运行权限的任务, 57 // 我们将其修改为Waiting状态,运行任务队列就更新完成了。 58 // 59 // 注意:未处于运行状态中的任务不会停留在内存中。 60 61 pub(crate) struct Scheduler { 62 qos: Qos, 63 running_queue: RunningQueue, 64 client_manager: ClientManagerEntry, 65 state_handler: state::Handler, 66 pub(crate) resort_scheduled: bool, 67 task_manager: TaskManagerTx, 68 } 69 70 impl Scheduler { init( tx: TaskManagerTx, runcount_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, active_counter: ActiveCounter, ) -> Scheduler71 pub(crate) fn init( 72 tx: TaskManagerTx, 73 runcount_manager: RunCountManagerEntry, 74 client_manager: ClientManagerEntry, 75 active_counter: ActiveCounter, 76 ) -> Scheduler { 77 let mut state_handler = state::Handler::new(tx.clone()); 78 let sql_list = state_handler.init(); 79 let db = RequestDb::get_instance(); 80 for sql in sql_list { 81 if let Err(e) = db.execute(&sql) { 82 error!("TaskManager update network failed {:?}", e); 83 }; 84 } 85 86 Self { 87 qos: Qos::new(), 88 running_queue: RunningQueue::new( 89 tx.clone(), 90 runcount_manager, 91 client_manager.clone(), 92 active_counter, 93 ), 94 client_manager, 95 state_handler, 96 resort_scheduled: false, 97 task_manager: tx, 98 } 99 } 100 get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>>101 pub(crate) fn get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>> { 102 self.running_queue.get_task(uid, task_id) 103 } 104 tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>>105 pub(crate) fn tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>> { 106 self.running_queue.tasks() 107 } 108 running_tasks(&self) -> usize109 pub(crate) fn running_tasks(&self) -> usize { 110 self.running_queue.running_tasks() 111 } 112 restore_all_tasks(&mut self)113 pub(crate) fn restore_all_tasks(&mut self) { 114 info!("reschedule restore all tasks"); 115 // Reschedule tasks based on the current `QOS` status. 116 self.schedule_if_not_scheduled(); 117 } 118 start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>119 pub(crate) fn start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 120 self.start_inner(uid, task_id, false) 121 } 122 resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>123 pub(crate) fn resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 124 self.start_inner(uid, task_id, true) 125 } 126 start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode>127 fn start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode> { 128 let database = RequestDb::get_instance(); 129 let info = RequestDb::get_instance() 130 .get_task_info(task_id) 131 .ok_or(ErrorCode::TaskNotFound)?; 132 133 if (is_resume && info.progress.common_data.state != State::Paused.repr) 134 || (!is_resume && info.progress.common_data.state == State::Paused.repr) 135 { 136 return Err(ErrorCode::TaskStateErr); 137 } 138 // Change `Waiting` so that it can be scheduled. 139 database.change_status(task_id, State::Waiting)?; 140 141 let info = RequestDb::get_instance() 142 .get_task_info(task_id) 143 .ok_or(ErrorCode::TaskNotFound)?; 144 if is_resume { 145 Notifier::resume(&self.client_manager, info.build_notify_data()); 146 } else { 147 // If the task is started, reset the task time. 148 database.update_task_time(task_id, 0); 149 } 150 151 if info.progress.is_finish() { 152 database.update_task_state(task_id, State::Completed, Reason::Default); 153 if let Some(info) = database.get_task_info(task_id) { 154 Notifier::complete(&self.client_manager, info.build_notify_data()); 155 } 156 } 157 158 if !self.check_config_satisfy(task_id)? { 159 return Ok(()); 160 }; 161 let qos_info = database 162 .get_task_qos_info(task_id) 163 .ok_or(ErrorCode::TaskNotFound)?; 164 self.qos.start_task(uid, qos_info); 165 self.schedule_if_not_scheduled(); 166 Ok(()) 167 } 168 pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>169 pub(crate) fn pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 170 let database = RequestDb::get_instance(); 171 database.change_status(task_id, State::Paused)?; 172 self.qos.remove_task(uid, task_id); 173 174 if self.running_queue.cancel_task(task_id, uid) { 175 self.running_queue.upload_resume.insert(task_id); 176 self.schedule_if_not_scheduled(); 177 } 178 let info = database 179 .get_task_info(task_id) 180 .ok_or(ErrorCode::TaskNotFound)?; 181 182 Notifier::pause(&self.client_manager, info.build_notify_data()); 183 Ok(()) 184 } 185 remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>186 pub(crate) fn remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 187 let database = RequestDb::get_instance(); 188 database.change_status(task_id, State::Removed)?; 189 self.qos.remove_task(uid, task_id); 190 191 if self.running_queue.cancel_task(task_id, uid) { 192 self.schedule_if_not_scheduled(); 193 } 194 database.remove_user_file_task(task_id); 195 let info = database 196 .get_task_info(task_id) 197 .ok_or(ErrorCode::TaskNotFound)?; 198 199 Notifier::remove(&self.client_manager, info.build_notify_data()); 200 Ok(()) 201 } 202 stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>203 pub(crate) fn stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> { 204 let database = RequestDb::get_instance(); 205 database.change_status(task_id, State::Stopped)?; 206 self.qos.remove_task(uid, task_id); 207 208 if self.running_queue.cancel_task(task_id, uid) { 209 self.schedule_if_not_scheduled(); 210 } 211 Ok(()) 212 } 213 set_max_speed( &mut self, uid: u64, task_id: u32, max_speed: i64, ) -> Result<(), ErrorCode>214 pub(crate) fn set_max_speed( 215 &mut self, 216 uid: u64, 217 task_id: u32, 218 max_speed: i64, 219 ) -> Result<(), ErrorCode> { 220 if let Some(task) = self.running_queue.get_task(uid, task_id) { 221 task.max_speed.store(max_speed, Ordering::SeqCst); 222 } 223 Ok(()) 224 } 225 task_set_mode( &mut self, uid: u64, task_id: u32, mode: Mode, ) -> Result<(), ErrorCode>226 pub(crate) fn task_set_mode( 227 &mut self, 228 uid: u64, 229 task_id: u32, 230 mode: Mode, 231 ) -> Result<(), ErrorCode> { 232 let database = RequestDb::get_instance(); 233 database.set_mode(task_id, mode)?; 234 235 if self.qos.task_set_mode(uid, task_id, mode) { 236 self.schedule_if_not_scheduled(); 237 } 238 if let Some(task) = self.running_queue.get_task_clone(uid, task_id) { 239 task.mode.store(mode.repr, Ordering::Release); 240 } 241 if mode == Mode::FrontEnd { 242 NotificationDispatcher::get_instance().unregister_task(uid, task_id, false); 243 } else if mode == Mode::BackGround { 244 NotificationDispatcher::get_instance().enable_task_progress_notification(task_id); 245 } 246 Ok(()) 247 } 248 task_completed(&mut self, uid: u64, task_id: u32)249 pub(crate) fn task_completed(&mut self, uid: u64, task_id: u32) { 250 info!("scheduler task {} completed", task_id); 251 self.running_queue.task_finish(uid, task_id); 252 253 let database = RequestDb::get_instance(); 254 if self.qos.remove_task(uid, task_id) { 255 self.schedule_if_not_scheduled(); 256 } 257 258 if let Some(info) = database.get_task_qos_info(task_id) { 259 if info.state == State::Failed.repr { 260 if let Some(task_info) = database.get_task_info(task_id) { 261 Scheduler::notify_fail(task_info, &self.client_manager, Reason::Default); 262 return; 263 } 264 } 265 266 if info.state != State::Running.repr && info.state != State::Waiting.repr { 267 return; 268 } 269 } 270 271 database.update_task_state(task_id, State::Completed, Reason::Default); 272 database.remove_user_file_task(task_id); 273 if let Some(info) = database.get_task_info(task_id) { 274 Notifier::complete(&self.client_manager, info.build_notify_data()); 275 NotificationDispatcher::get_instance().publish_success_notification(&info); 276 } 277 } 278 task_cancel( &mut self, uid: u64, task_id: u32, mode: Mode, task_count: &mut HashMap<u64, (usize, usize)>, )279 pub(crate) fn task_cancel( 280 &mut self, 281 uid: u64, 282 task_id: u32, 283 mode: Mode, 284 task_count: &mut HashMap<u64, (usize, usize)>, 285 ) { 286 info!("scheduler task {} canceled", task_id); 287 self.running_queue.task_finish(uid, task_id); 288 if self.running_queue.try_restart(uid, task_id) { 289 return; 290 } 291 292 let database = RequestDb::get_instance(); 293 let Some(info) = database.get_task_info(task_id) else { 294 error!("task {} not found in database", task_id); 295 NotificationDispatcher::get_instance().unregister_task(uid, task_id, true); 296 return; 297 }; 298 match State::from(info.progress.common_data.state) { 299 State::Running | State::Retrying => { 300 info!("task {} waiting for task limits", task_id); 301 RequestDb::get_instance().update_task_state( 302 task_id, 303 State::Waiting, 304 Reason::RunningTaskMeetLimits, 305 ); 306 Notifier::waiting(&self.client_manager, task_id, WaitingCause::TaskQueue); 307 } 308 State::Failed => { 309 info!("task {} cancel with state Failed", task_id); 310 Scheduler::reduce_task_count(uid, mode, task_count); 311 let reason = info.common_data.reason; 312 Scheduler::notify_fail(info, &self.client_manager, Reason::from(reason)); 313 } 314 State::Stopped | State::Removed => { 315 info!("task {} cancel with state Stopped or Removed", task_id); 316 NotificationDispatcher::get_instance().unregister_task(uid, task_id, true); 317 self.running_queue.try_restart(uid, task_id); 318 } 319 State::Waiting => { 320 info!("task {} cancel with state Waiting", task_id); 321 let reason = match info.common_data.reason { 322 reason if reason == Reason::AppBackgroundOrTerminate.repr => { 323 WaitingCause::AppState 324 } 325 reason 326 if reason == Reason::NetworkOffline.repr 327 || reason == Reason::UnsupportedNetworkType.repr => 328 { 329 WaitingCause::Network 330 } 331 reason if reason == Reason::RunningTaskMeetLimits.repr => { 332 WaitingCause::TaskQueue 333 } 334 reason if reason == Reason::AccountStopped.repr => WaitingCause::UserState, 335 reason => { 336 error!("task {} cancel with other reason {}", task_id, reason); 337 WaitingCause::TaskQueue 338 } 339 }; 340 Notifier::waiting(&self.client_manager, task_id, reason); 341 } 342 state => { 343 info!( 344 "task {} cancel state {:?} reason {:?}", 345 task_id, 346 state, 347 Reason::from(info.common_data.reason) 348 ); 349 } 350 } 351 } 352 task_failed(&mut self, uid: u64, task_id: u32, reason: Reason)353 pub(crate) fn task_failed(&mut self, uid: u64, task_id: u32, reason: Reason) { 354 info!("scheduler task {} failed", task_id); 355 self.running_queue.task_finish(uid, task_id); 356 357 let database = RequestDb::get_instance(); 358 359 if self.qos.remove_task(uid, task_id) { 360 self.schedule_if_not_scheduled(); 361 } 362 363 if let Some(info) = database.get_task_qos_info(task_id) { 364 if info.state != State::Running.repr && info.state != State::Waiting.repr { 365 return; 366 } 367 } 368 369 database.update_task_state(task_id, State::Failed, reason); 370 if let Some(info) = database.get_task_info(task_id) { 371 let reason = info.common_data.reason; 372 Scheduler::notify_fail(info, &self.client_manager, Reason::from(reason)); 373 } 374 } 375 notify_fail(info: TaskInfo, client_manager: &ClientManagerEntry, reason: Reason)376 fn notify_fail(info: TaskInfo, client_manager: &ClientManagerEntry, reason: Reason) { 377 Notifier::fail(client_manager, info.build_notify_data()); 378 Notifier::faults(info.common_data.task_id, client_manager, reason); 379 NotificationDispatcher::get_instance().publish_failed_notification(&info); 380 #[cfg(feature = "oh")] 381 Self::sys_event(info); 382 } 383 reduce_task_count( uid: u64, mode: Mode, task_count: &mut HashMap<u64, (usize, usize)>, )384 pub(crate) fn reduce_task_count( 385 uid: u64, 386 mode: Mode, 387 task_count: &mut HashMap<u64, (usize, usize)>, 388 ) { 389 if let Some((front, back)) = task_count.get_mut(&uid) { 390 match mode { 391 Mode::FrontEnd => { 392 if *front > 0 { 393 *front -= 1; 394 } 395 } 396 _ => { 397 if *back > 0 { 398 *back -= 1; 399 } 400 } 401 } 402 } 403 } 404 405 #[cfg(feature = "oh")] sys_event(info: TaskInfo)406 pub(crate) fn sys_event(info: TaskInfo) { 407 use crate::sys_event::sys_task_fault; 408 409 let index = info.progress.common_data.index; 410 let size = info.file_specs.len(); 411 let action = match info.action() { 412 Action::Download => "DOWNLOAD", 413 Action::Upload => "UPLOAD", 414 _ => "UNKNOWN", 415 }; 416 let reason = Reason::from(info.common_data.reason); 417 418 sys_task_fault( 419 action, 420 size as i32, 421 (size - index) as i32, 422 index as i32, 423 reason.repr as i32, 424 ); 425 } 426 on_state_change<T, F>(&mut self, f: F, t: T) where F: FnOnce(&mut state::Handler, T) -> Option<SqlList>,427 pub(crate) fn on_state_change<T, F>(&mut self, f: F, t: T) 428 where 429 F: FnOnce(&mut state::Handler, T) -> Option<SqlList>, 430 { 431 let Some(sql_list) = f(&mut self.state_handler, t) else { 432 return; 433 }; 434 let db = RequestDb::get_instance(); 435 for sql in sql_list { 436 if let Err(e) = db.execute(&sql) { 437 error!("TaskManager update network failed {:?}", e); 438 }; 439 } 440 self.reload_all_tasks(); 441 } 442 reload_all_tasks(&mut self)443 pub(crate) fn reload_all_tasks(&mut self) { 444 self.qos.reload_all_tasks(); 445 self.schedule_if_not_scheduled(); 446 } 447 on_rss_change(&mut self, level: i32)448 pub(crate) fn on_rss_change(&mut self, level: i32) { 449 if let Some(new_rss) = self.state_handler.update_rss_level(level) { 450 self.qos.change_rss(new_rss); 451 self.schedule_if_not_scheduled(); 452 } 453 } 454 schedule_if_not_scheduled(&mut self)455 fn schedule_if_not_scheduled(&mut self) { 456 if self.resort_scheduled { 457 return; 458 } 459 self.resort_scheduled = true; 460 let task_manager = self.task_manager.clone(); 461 task_manager.send_event(TaskManagerEvent::Reschedule); 462 } 463 reschedule(&mut self)464 pub(crate) fn reschedule(&mut self) { 465 self.resort_scheduled = false; 466 let changes = self.qos.reschedule(&self.state_handler); 467 let mut qos_remove_queue = vec![]; 468 self.running_queue 469 .reschedule(changes, &mut qos_remove_queue); 470 for (uid, task_id) in qos_remove_queue.iter() { 471 self.qos.apps.remove_task(*uid, *task_id); 472 } 473 if !qos_remove_queue.is_empty() { 474 self.reload_all_tasks(); 475 } 476 } 477 check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode>478 pub(crate) fn check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode> { 479 let database = RequestDb::get_instance(); 480 let config = database 481 .get_task_config(task_id) 482 .ok_or(ErrorCode::TaskNotFound)?; 483 484 if let Err(reason) = config.satisfy_network(self.state_handler.network()) { 485 info!( 486 "task {} started, waiting for network {:?}", 487 task_id, 488 self.state_handler.network() 489 ); 490 491 database.update_task_state(task_id, State::Waiting, reason); 492 Notifier::waiting(&self.client_manager, task_id, WaitingCause::Network); 493 return Ok(false); 494 } 495 496 if !config.satisfy_foreground(self.state_handler.foreground_abilities()) { 497 info!( 498 "task {} started, waiting for app {}", 499 task_id, config.common_data.uid 500 ); 501 database.update_task_state(task_id, State::Waiting, Reason::AppBackgroundOrTerminate); 502 Notifier::waiting(&self.client_manager, task_id, WaitingCause::AppState); 503 return Ok(false); 504 } 505 Ok(true) 506 } 507 clear_timeout_tasks(&mut self)508 pub(crate) fn clear_timeout_tasks(&mut self) { 509 let current_time = get_current_timestamp(); 510 let timeout_tasks = self 511 .tasks() 512 .filter(|task| current_time - task.ctime > MILLISECONDS_IN_ONE_MONTH) 513 .cloned() 514 .collect::<Vec<_>>(); 515 if timeout_tasks.is_empty() { 516 return; 517 } 518 let database = RequestDb::get_instance(); 519 for task in timeout_tasks { 520 if database 521 .change_status(task.task_id(), State::Stopped) 522 .is_ok() 523 { 524 self.qos.apps.remove_task(task.uid(), task.task_id()); 525 } 526 } 527 self.schedule_if_not_scheduled(); 528 } 529 retry_all_tasks(&mut self)530 pub(crate) fn retry_all_tasks(&mut self) { 531 self.running_queue.retry_all_tasks(); 532 } 533 } 534 535 impl RequestDb { change_status(&self, task_id: u32, new_state: State) -> Result<(), ErrorCode>536 fn change_status(&self, task_id: u32, new_state: State) -> Result<(), ErrorCode> { 537 let info = RequestDb::get_instance() 538 .get_task_info(task_id) 539 .ok_or(ErrorCode::TaskNotFound)?; 540 541 let old_state = info.progress.common_data.state; 542 if old_state == new_state.repr { 543 if new_state == State::Removed { 544 return Err(ErrorCode::TaskNotFound); 545 } else { 546 return Err(ErrorCode::TaskStateErr); 547 } 548 } 549 let sql = match new_state { 550 State::Paused => sql::pause_task(task_id), 551 State::Running => sql::start_task(task_id), 552 State::Stopped => sql::stop_task(task_id), 553 State::Removed => sql::remove_task(task_id), 554 State::Waiting => sql::start_task(task_id), 555 _ => return Err(ErrorCode::Other), 556 }; 557 558 RequestDb::get_instance() 559 .execute(&sql) 560 .map_err(|_| ErrorCode::SystemApi)?; 561 562 let info = RequestDb::get_instance() 563 .get_task_info(task_id) 564 .ok_or(ErrorCode::SystemApi)?; 565 if info.progress.common_data.state != new_state.repr { 566 return Err(ErrorCode::TaskStateErr); 567 } 568 569 if (old_state == State::Initialized.repr 570 || old_state == State::Waiting.repr 571 || old_state == State::Paused.repr) 572 && (new_state == State::Stopped || new_state == State::Removed) 573 { 574 NotificationDispatcher::get_instance().unregister_task(info.uid(), task_id, true); 575 } 576 Ok(()) 577 } 578 set_mode(&self, task_id: u32, mode: Mode) -> Result<(), ErrorCode>579 fn set_mode(&self, task_id: u32, mode: Mode) -> Result<(), ErrorCode> { 580 let info = RequestDb::get_instance() 581 .get_task_info(task_id) 582 .ok_or(ErrorCode::TaskNotFound)?; 583 let old_mode = info.common_data.mode; 584 if old_mode == mode.repr { 585 return Ok(()); 586 } 587 let sql = sql::task_set_mode(task_id, mode); 588 RequestDb::get_instance() 589 .execute(&sql) 590 .map_err(|_| ErrorCode::SystemApi)?; 591 let info = RequestDb::get_instance() 592 .get_task_info(task_id) 593 .ok_or(ErrorCode::SystemApi)?; 594 if info.common_data.mode != mode.repr { 595 return Err(ErrorCode::TaskStateErr); 596 } 597 Ok(()) 598 } 599 } 600