• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 mod qos;
15 mod queue;
16 pub(crate) mod state;
17 use std::collections::HashMap;
18 use std::sync::Arc;
19 mod sql;
20 use qos::Qos;
21 use queue::RunningQueue;
22 use state::sql::SqlList;
23 
24 use super::events::TaskManagerEvent;
25 use crate::config::Mode;
26 use crate::error::ErrorCode;
27 use crate::info::TaskInfo;
28 use crate::manage::database::RequestDb;
29 use crate::manage::notifier::Notifier;
30 use crate::manage::task_manager::TaskManagerTx;
31 use crate::service::client::ClientManagerEntry;
32 use crate::service::notification_bar::NotificationDispatcher;
33 use crate::service::run_count::RunCountManagerEntry;
34 use crate::task::config::Action;
35 use crate::task::info::State;
36 use crate::task::reason::Reason;
37 use crate::task::request_task::RequestTask;
38 use crate::utils::get_current_timestamp;
39 
40 const MILLISECONDS_IN_ONE_MONTH: u64 = 30 * 24 * 60 * 60 * 1000;
41 
42 // Scheduler 的基本处理逻辑如下:
43 // 1. Scheduler 维护一个当前所有 运行中 和
44 //    待运行的任务优先级队列(scheduler.qos),
45 // 该队列仅保存任务的优先级信息和基础信息,当环境发生变化时,
46 // 将该优先级队列重新排序,并得到一系列优先级调节指令(QosChange),
47 // 这些指令的作用是指引运行队列将满足优先级排序的任务变为运行状态。
48 //
49 // 2. 得到指令后,将该指令作用于任务队列(scheduler.queue)。
50 // 任务队列保存当前正在运行的任务列表(scheduler.queue.running),
51 // 所以运行队列根据指令的内容, 将指令引导的那些任务置于运行任务列表,
52 // 并调节速率。对于那些当前正在执行,但此时又未得到运行权限的任务,
53 // 我们将其修改为Waiting状态,运行任务队列就更新完成了。
54 //
55 // 注意:未处于运行状态中的任务不会停留在内存中。
56 
57 pub(crate) struct Scheduler {
58     qos: Qos,
59     running_queue: RunningQueue,
60     client_manager: ClientManagerEntry,
61     state_handler: state::Handler,
62     pub(crate) resort_scheduled: bool,
63     task_manager: TaskManagerTx,
64 }
65 
66 impl Scheduler {
init( tx: TaskManagerTx, runcount_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, ) -> Scheduler67     pub(crate) fn init(
68         tx: TaskManagerTx,
69         runcount_manager: RunCountManagerEntry,
70         client_manager: ClientManagerEntry,
71     ) -> Scheduler {
72         let mut state_handler = state::Handler::new(tx.clone());
73         let sql_list = state_handler.init();
74         let db = RequestDb::get_instance();
75         for sql in sql_list {
76             if let Err(e) = db.execute(&sql) {
77                 error!("TaskManager update network failed {:?}", e);
78             };
79         }
80 
81         Self {
82             qos: Qos::new(),
83             running_queue: RunningQueue::new(tx.clone(), runcount_manager, client_manager.clone()),
84             client_manager,
85             state_handler,
86             resort_scheduled: false,
87             task_manager: tx,
88         }
89     }
90 
get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>>91     pub(crate) fn get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>> {
92         self.running_queue.get_task(uid, task_id)
93     }
94 
tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>>95     pub(crate) fn tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>> {
96         self.running_queue.tasks()
97     }
98 
running_tasks(&self) -> usize99     pub(crate) fn running_tasks(&self) -> usize {
100         self.running_queue.running_tasks()
101     }
102 
dump_tasks(&self)103     pub(crate) fn dump_tasks(&self) {
104         self.running_queue.dump_tasks();
105     }
106 
restore_all_tasks(&mut self)107     pub(crate) fn restore_all_tasks(&mut self) {
108         info!("reschedule restore all tasks");
109         // Reschedule tasks based on the current `QOS` status.
110         self.schedule_if_not_scheduled();
111     }
112 
start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>113     pub(crate) fn start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
114         self.start_inner(uid, task_id, false)
115     }
116 
resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>117     pub(crate) fn resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
118         self.start_inner(uid, task_id, true)
119     }
120 
start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode>121     fn start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode> {
122         let database = RequestDb::get_instance();
123         let info = RequestDb::get_instance()
124             .get_task_info(task_id)
125             .ok_or(ErrorCode::TaskNotFound)?;
126 
127         if (is_resume && info.progress.common_data.state != State::Paused.repr)
128             || (!is_resume && info.progress.common_data.state == State::Paused.repr)
129         {
130             return Err(ErrorCode::TaskStateErr);
131         }
132         // Change `Waiting` so that it can be scheduled.
133         database.change_status(task_id, State::Waiting)?;
134 
135         let info = RequestDb::get_instance()
136             .get_task_info(task_id)
137             .ok_or(ErrorCode::TaskNotFound)?;
138         if is_resume {
139             Notifier::resume(&self.client_manager, info.build_notify_data());
140         }
141 
142         if info.progress.is_finish() {
143             database.update_task_state(task_id, State::Completed, Reason::Default);
144             if let Some(info) = database.get_task_info(task_id) {
145                 Notifier::complete(&self.client_manager, info.build_notify_data());
146             }
147         }
148 
149         if !self.check_config_satisfy(task_id)? {
150             return Ok(());
151         };
152         let qos_info = database
153             .get_task_qos_info(task_id)
154             .ok_or(ErrorCode::TaskNotFound)?;
155         self.qos.start_task(uid, qos_info);
156         self.schedule_if_not_scheduled();
157         Ok(())
158     }
159 
pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>160     pub(crate) fn pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
161         let database = RequestDb::get_instance();
162         database.change_status(task_id, State::Paused)?;
163 
164         if let Some(info) = database.get_task_info(task_id) {
165             Notifier::pause(&self.client_manager, info.build_notify_data());
166         }
167         self.running_queue.upload_resume.insert(task_id);
168         if self.qos.remove_task(uid, task_id) {
169             self.schedule_if_not_scheduled();
170         }
171         Ok(())
172     }
173 
remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>174     pub(crate) fn remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
175         let database = RequestDb::get_instance();
176         database.change_status(task_id, State::Removed)?;
177         let info = database
178             .get_task_info(task_id)
179             .ok_or(ErrorCode::TaskNotFound)?;
180 
181         Notifier::remove(&self.client_manager, info.build_notify_data());
182 
183         if self.qos.remove_task(uid, task_id) {
184             self.schedule_if_not_scheduled();
185         }
186         Ok(())
187     }
188 
stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>189     pub(crate) fn stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
190         let database = RequestDb::get_instance();
191         database.change_status(task_id, State::Stopped)?;
192 
193         if self.qos.remove_task(uid, task_id) {
194             self.schedule_if_not_scheduled();
195         }
196         Ok(())
197     }
198 
task_completed(&mut self, uid: u64, task_id: u32)199     pub(crate) fn task_completed(&mut self, uid: u64, task_id: u32) {
200         info!("scheduler task {} completed", task_id);
201         self.running_queue.task_finish(uid, task_id);
202 
203         let database = RequestDb::get_instance();
204         if self.qos.remove_task(uid, task_id) {
205             self.schedule_if_not_scheduled();
206         }
207 
208         if let Some(info) = database.get_task_qos_info(task_id) {
209             if info.state == State::Failed.repr {
210                 if let Some(task_info) = database.get_task_info(task_id) {
211                     Scheduler::notify_fail(task_info, &self.client_manager);
212                     return;
213                 }
214             }
215 
216             if info.state != State::Running.repr && info.state != State::Waiting.repr {
217                 return;
218             }
219         }
220 
221         database.update_task_state(task_id, State::Completed, Reason::Default);
222         if let Some(info) = database.get_task_info(task_id) {
223             Notifier::complete(&self.client_manager, info.build_notify_data());
224             NotificationDispatcher::get_instance().publish_success_notification(&info);
225         }
226     }
227 
task_cancel( &mut self, uid: u64, task_id: u32, mode: Mode, task_count: &mut HashMap<u64, (usize, usize)>, )228     pub(crate) fn task_cancel(
229         &mut self,
230         uid: u64,
231         task_id: u32,
232         mode: Mode,
233         task_count: &mut HashMap<u64, (usize, usize)>,
234     ) {
235         info!("scheduler task {} canceled", task_id);
236         self.running_queue.task_finish(uid, task_id);
237         if self.running_queue.try_restart(uid, task_id) {
238             return;
239         }
240         let database = RequestDb::get_instance();
241         let Some(info) = database.get_task_info(task_id) else {
242             error!("task {} not found in database", task_id);
243             NotificationDispatcher::get_instance().unregister_task(uid, task_id);
244             return;
245         };
246         match State::from(info.progress.common_data.state) {
247             State::Running | State::Retrying => {
248                 info!("task {} waiting for task limits", task_id);
249                 RequestDb::get_instance().update_task_state(
250                     task_id,
251                     State::Waiting,
252                     Reason::RunningTaskMeetLimits,
253                 );
254             }
255             State::Failed => {
256                 info!("task {} cancel with state Failed", task_id);
257                 Scheduler::reduce_task_count(uid, mode, task_count);
258                 Scheduler::notify_fail(info, &self.client_manager);
259             }
260             State::Stopped | State::Removed => {
261                 info!("task {} cancel with state Stopped or Removed", task_id);
262                 NotificationDispatcher::get_instance().unregister_task(uid, task_id);
263             }
264             state => {
265                 info!(
266                     "task {} cancel state {:?} reason {:?}",
267                     task_id,
268                     state,
269                     Reason::from(info.common_data.reason)
270                 );
271             }
272         }
273     }
274 
task_failed(&mut self, uid: u64, task_id: u32, reason: Reason)275     pub(crate) fn task_failed(&mut self, uid: u64, task_id: u32, reason: Reason) {
276         info!("scheduler task {} failed", task_id);
277         self.running_queue.task_finish(uid, task_id);
278 
279         let database = RequestDb::get_instance();
280 
281         if self.qos.remove_task(uid, task_id) {
282             self.schedule_if_not_scheduled();
283         }
284 
285         if let Some(info) = database.get_task_qos_info(task_id) {
286             if info.state != State::Running.repr && info.state != State::Waiting.repr {
287                 return;
288             }
289         }
290 
291         database.update_task_state(task_id, State::Failed, reason);
292         if let Some(info) = database.get_task_info(task_id) {
293             Scheduler::notify_fail(info, &self.client_manager);
294         }
295     }
296 
notify_fail(info: TaskInfo, client_manager: &ClientManagerEntry)297     fn notify_fail(info: TaskInfo, client_manager: &ClientManagerEntry) {
298         Notifier::fail(client_manager, info.build_notify_data());
299         NotificationDispatcher::get_instance().publish_failed_notification(&info);
300         #[cfg(feature = "oh")]
301         Self::sys_event(info);
302     }
303 
reduce_task_count( uid: u64, mode: Mode, task_count: &mut HashMap<u64, (usize, usize)>, )304     pub(crate) fn reduce_task_count(
305         uid: u64,
306         mode: Mode,
307         task_count: &mut HashMap<u64, (usize, usize)>,
308     ) {
309         if let Some((front, back)) = task_count.get_mut(&uid) {
310             match mode {
311                 Mode::FrontEnd => {
312                     if *front > 0 {
313                         *front -= 1;
314                     }
315                 }
316                 _ => {
317                     if *back > 0 {
318                         *back -= 1;
319                     }
320                 }
321             }
322         }
323     }
324 
325     #[cfg(feature = "oh")]
sys_event(info: TaskInfo)326     pub(crate) fn sys_event(info: TaskInfo) {
327         use hisysevent::{build_number_param, build_str_param};
328 
329         use crate::sys_event::SysEvent;
330 
331         let index = info.progress.common_data.index;
332         let size = info.file_specs.len();
333         let action = match info.action() {
334             Action::Download => "DOWNLOAD",
335             Action::Upload => "UPLOAD",
336             _ => "UNKNOWN",
337         };
338         let reason = Reason::from(info.common_data.reason);
339 
340         SysEvent::task_fault()
341             .param(build_str_param!(crate::sys_event::TASKS_TYPE, action))
342             .param(build_number_param!(
343                 crate::sys_event::TOTAL_FILE_NUM,
344                 size as i32
345             ))
346             .param(build_number_param!(
347                 crate::sys_event::FAIL_FILE_NUM,
348                 (size - index) as i32
349             ))
350             .param(build_number_param!(
351                 crate::sys_event::SUCCESS_FILE_NUM,
352                 index as i32
353             ))
354             .param(build_number_param!(
355                 crate::sys_event::ERROR_INFO,
356                 reason.repr as i32
357             ))
358             .write();
359     }
360 
on_state_change<T, F>(&mut self, f: F, t: T) where F: FnOnce(&mut state::Handler, T) -> Option<SqlList>,361     pub(crate) fn on_state_change<T, F>(&mut self, f: F, t: T)
362     where
363         F: FnOnce(&mut state::Handler, T) -> Option<SqlList>,
364     {
365         let Some(sql_list) = f(&mut self.state_handler, t) else {
366             return;
367         };
368         let db = RequestDb::get_instance();
369         for sql in sql_list {
370             if let Err(e) = db.execute(&sql) {
371                 error!("TaskManager update network failed {:?}", e);
372             };
373         }
374         self.reload_all_tasks();
375     }
376 
reload_all_tasks(&mut self)377     pub(crate) fn reload_all_tasks(&mut self) {
378         self.qos.reload_all_tasks();
379         self.schedule_if_not_scheduled();
380     }
381 
on_rss_change(&mut self, level: i32)382     pub(crate) fn on_rss_change(&mut self, level: i32) {
383         if let Some(new_rss) = self.state_handler.update_rss_level(level) {
384             self.qos.change_rss(new_rss);
385             self.schedule_if_not_scheduled();
386         }
387     }
388 
schedule_if_not_scheduled(&mut self)389     fn schedule_if_not_scheduled(&mut self) {
390         if self.resort_scheduled {
391             return;
392         }
393         self.resort_scheduled = true;
394         let task_manager = self.task_manager.clone();
395         task_manager.send_event(TaskManagerEvent::Reschedule);
396     }
397 
reschedule(&mut self)398     pub(crate) fn reschedule(&mut self) {
399         self.resort_scheduled = false;
400         let changes = self.qos.reschedule(&self.state_handler);
401         let mut qos_remove_queue = vec![];
402         self.running_queue
403             .reschedule(changes, &mut qos_remove_queue);
404         for (uid, task_id) in qos_remove_queue.iter() {
405             self.qos.apps.remove_task(*uid, *task_id);
406         }
407         if !qos_remove_queue.is_empty() {
408             self.reload_all_tasks();
409         }
410     }
411 
check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode>412     pub(crate) fn check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode> {
413         let database = RequestDb::get_instance();
414         let config = database
415             .get_task_config(task_id)
416             .ok_or(ErrorCode::TaskNotFound)?;
417 
418         if let Err(reason) = config.satisfy_network(self.state_handler.network()) {
419             info!(
420                 "task {} started, waiting for network {:?}",
421                 task_id,
422                 self.state_handler.network()
423             );
424 
425             database.update_task_state(task_id, State::Waiting, reason);
426             return Ok(false);
427         }
428 
429         if !config.satisfy_foreground(self.state_handler.foreground_abilities()) {
430             info!(
431                 "task {} started, waiting for app {}",
432                 task_id, config.common_data.uid
433             );
434             database.update_task_state(task_id, State::Waiting, Reason::AppBackgroundOrTerminate);
435             return Ok(false);
436         }
437         Ok(true)
438     }
439 
clear_timeout_tasks(&mut self)440     pub(crate) fn clear_timeout_tasks(&mut self) {
441         let current_time = get_current_timestamp();
442         let timeout_tasks = self
443             .tasks()
444             .filter(|task| current_time - task.ctime > MILLISECONDS_IN_ONE_MONTH)
445             .cloned()
446             .collect::<Vec<_>>();
447         if timeout_tasks.is_empty() {
448             return;
449         }
450         let database = RequestDb::get_instance();
451         for task in timeout_tasks {
452             if database
453                 .change_status(task.task_id(), State::Stopped)
454                 .is_ok()
455             {
456                 self.qos.apps.remove_task(task.uid(), task.task_id());
457             }
458         }
459         self.schedule_if_not_scheduled();
460     }
461 
retry_all_tasks(&mut self)462     pub(crate) fn retry_all_tasks(&mut self) {
463         self.running_queue.retry_all_tasks();
464     }
465 }
466 
467 impl RequestDb {
change_status(&self, task_id: u32, new_state: State) -> Result<(), ErrorCode>468     fn change_status(&self, task_id: u32, new_state: State) -> Result<(), ErrorCode> {
469         let info = RequestDb::get_instance()
470             .get_task_info(task_id)
471             .ok_or(ErrorCode::TaskNotFound)?;
472 
473         let old_state = info.progress.common_data.state;
474         if old_state == new_state.repr {
475             if new_state == State::Removed {
476                 return Err(ErrorCode::TaskNotFound);
477             } else {
478                 return Err(ErrorCode::TaskStateErr);
479             }
480         }
481         let sql = match new_state {
482             State::Paused => sql::pause_task(task_id),
483             State::Running => sql::start_task(task_id),
484             State::Stopped => sql::stop_task(task_id),
485             State::Removed => sql::remove_task(task_id),
486             State::Waiting => sql::start_task(task_id),
487             _ => return Err(ErrorCode::Other),
488         };
489 
490         RequestDb::get_instance()
491             .execute(&sql)
492             .map_err(|_| ErrorCode::SystemApi)?;
493 
494         let info = RequestDb::get_instance()
495             .get_task_info(task_id)
496             .ok_or(ErrorCode::SystemApi)?;
497         if info.progress.common_data.state != new_state.repr {
498             return Err(ErrorCode::TaskStateErr);
499         }
500 
501         if (old_state == State::Initialized.repr
502             || old_state == State::Waiting.repr
503             || old_state == State::Paused.repr)
504             && (new_state == State::Stopped || new_state == State::Removed)
505         {
506             NotificationDispatcher::get_instance().unregister_task(info.uid(), task_id);
507         }
508         Ok(())
509     }
510 }
511