• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 mod qos;
15 mod queue;
16 pub(crate) mod state;
17 use std::collections::HashMap;
18 use std::sync::atomic::Ordering;
19 use std::sync::Arc;
20 
21 mod sql;
22 use qos::Qos;
23 use queue::RunningQueue;
24 use state::sql::SqlList;
25 
26 use super::events::TaskManagerEvent;
27 use crate::config::Mode;
28 use crate::error::ErrorCode;
29 use crate::info::TaskInfo;
30 use crate::manage::database::RequestDb;
31 use crate::manage::notifier::Notifier;
32 use crate::manage::task_manager::TaskManagerTx;
33 use crate::service::active_counter::ActiveCounter;
34 use crate::service::client::ClientManagerEntry;
35 use crate::service::notification_bar::NotificationDispatcher;
36 use crate::service::run_count::RunCountManagerEntry;
37 use crate::task::config::Action;
38 use crate::task::info::State;
39 use crate::task::notify::WaitingCause;
40 use crate::task::reason::Reason;
41 use crate::task::request_task::RequestTask;
42 use crate::utils::get_current_timestamp;
43 
44 const MILLISECONDS_IN_ONE_MONTH: u64 = 30 * 24 * 60 * 60 * 1000;
45 
46 // Scheduler 的基本处理逻辑如下:
47 // 1. Scheduler 维护一个当前所有 运行中 和
48 //    待运行的任务优先级队列(scheduler.qos),
49 // 该队列仅保存任务的优先级信息和基础信息,当环境发生变化时,
50 // 将该优先级队列重新排序,并得到一系列优先级调节指令(QosChange),
51 // 这些指令的作用是指引运行队列将满足优先级排序的任务变为运行状态。
52 //
53 // 2. 得到指令后,将该指令作用于任务队列(scheduler.queue)。
54 // 任务队列保存当前正在运行的任务列表(scheduler.queue.running),
55 // 所以运行队列根据指令的内容, 将指令引导的那些任务置于运行任务列表,
56 // 并调节速率。对于那些当前正在执行,但此时又未得到运行权限的任务,
57 // 我们将其修改为Waiting状态,运行任务队列就更新完成了。
58 //
59 // 注意:未处于运行状态中的任务不会停留在内存中。
60 
61 pub(crate) struct Scheduler {
62     qos: Qos,
63     running_queue: RunningQueue,
64     client_manager: ClientManagerEntry,
65     state_handler: state::Handler,
66     pub(crate) resort_scheduled: bool,
67     task_manager: TaskManagerTx,
68 }
69 
70 impl Scheduler {
init( tx: TaskManagerTx, runcount_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, active_counter: ActiveCounter, ) -> Scheduler71     pub(crate) fn init(
72         tx: TaskManagerTx,
73         runcount_manager: RunCountManagerEntry,
74         client_manager: ClientManagerEntry,
75         active_counter: ActiveCounter,
76     ) -> Scheduler {
77         let mut state_handler = state::Handler::new(tx.clone());
78         let sql_list = state_handler.init();
79         let db = RequestDb::get_instance();
80         for sql in sql_list {
81             if let Err(e) = db.execute(&sql) {
82                 error!("TaskManager update network failed {:?}", e);
83             };
84         }
85 
86         Self {
87             qos: Qos::new(),
88             running_queue: RunningQueue::new(
89                 tx.clone(),
90                 runcount_manager,
91                 client_manager.clone(),
92                 active_counter,
93             ),
94             client_manager,
95             state_handler,
96             resort_scheduled: false,
97             task_manager: tx,
98         }
99     }
100 
get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>>101     pub(crate) fn get_task(&self, uid: u64, task_id: u32) -> Option<&Arc<RequestTask>> {
102         self.running_queue.get_task(uid, task_id)
103     }
104 
tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>>105     pub(crate) fn tasks(&self) -> impl Iterator<Item = &Arc<RequestTask>> {
106         self.running_queue.tasks()
107     }
108 
running_tasks(&self) -> usize109     pub(crate) fn running_tasks(&self) -> usize {
110         self.running_queue.running_tasks()
111     }
112 
restore_all_tasks(&mut self)113     pub(crate) fn restore_all_tasks(&mut self) {
114         info!("reschedule restore all tasks");
115         // Reschedule tasks based on the current `QOS` status.
116         self.schedule_if_not_scheduled();
117     }
118 
start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>119     pub(crate) fn start_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
120         self.start_inner(uid, task_id, false)
121     }
122 
resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>123     pub(crate) fn resume_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
124         self.start_inner(uid, task_id, true)
125     }
126 
start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode>127     fn start_inner(&mut self, uid: u64, task_id: u32, is_resume: bool) -> Result<(), ErrorCode> {
128         let database = RequestDb::get_instance();
129         let info = RequestDb::get_instance()
130             .get_task_info(task_id)
131             .ok_or(ErrorCode::TaskNotFound)?;
132 
133         if (is_resume && info.progress.common_data.state != State::Paused.repr)
134             || (!is_resume && info.progress.common_data.state == State::Paused.repr)
135         {
136             return Err(ErrorCode::TaskStateErr);
137         }
138         // Change `Waiting` so that it can be scheduled.
139         database.change_status(task_id, State::Waiting)?;
140 
141         let info = RequestDb::get_instance()
142             .get_task_info(task_id)
143             .ok_or(ErrorCode::TaskNotFound)?;
144         if is_resume {
145             Notifier::resume(&self.client_manager, info.build_notify_data());
146         } else {
147             // If the task is started, reset the task time.
148             database.update_task_time(task_id, 0);
149         }
150 
151         if info.progress.is_finish() {
152             database.update_task_state(task_id, State::Completed, Reason::Default);
153             if let Some(info) = database.get_task_info(task_id) {
154                 Notifier::complete(&self.client_manager, info.build_notify_data());
155             }
156         }
157 
158         if !self.check_config_satisfy(task_id)? {
159             return Ok(());
160         };
161         let qos_info = database
162             .get_task_qos_info(task_id)
163             .ok_or(ErrorCode::TaskNotFound)?;
164         self.qos.start_task(uid, qos_info);
165         self.schedule_if_not_scheduled();
166         Ok(())
167     }
168 
pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>169     pub(crate) fn pause_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
170         let database = RequestDb::get_instance();
171         database.change_status(task_id, State::Paused)?;
172         self.qos.remove_task(uid, task_id);
173 
174         if self.running_queue.cancel_task(task_id, uid) {
175             self.running_queue.upload_resume.insert(task_id);
176             self.schedule_if_not_scheduled();
177         }
178         let info = database
179             .get_task_info(task_id)
180             .ok_or(ErrorCode::TaskNotFound)?;
181 
182         Notifier::pause(&self.client_manager, info.build_notify_data());
183         Ok(())
184     }
185 
remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>186     pub(crate) fn remove_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
187         let database = RequestDb::get_instance();
188         database.change_status(task_id, State::Removed)?;
189         self.qos.remove_task(uid, task_id);
190 
191         if self.running_queue.cancel_task(task_id, uid) {
192             self.schedule_if_not_scheduled();
193         }
194         database.remove_user_file_task(task_id);
195         let info = database
196             .get_task_info(task_id)
197             .ok_or(ErrorCode::TaskNotFound)?;
198 
199         Notifier::remove(&self.client_manager, info.build_notify_data());
200         Ok(())
201     }
202 
stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode>203     pub(crate) fn stop_task(&mut self, uid: u64, task_id: u32) -> Result<(), ErrorCode> {
204         let database = RequestDb::get_instance();
205         database.change_status(task_id, State::Stopped)?;
206         self.qos.remove_task(uid, task_id);
207 
208         if self.running_queue.cancel_task(task_id, uid) {
209             self.schedule_if_not_scheduled();
210         }
211         Ok(())
212     }
213 
set_max_speed( &mut self, uid: u64, task_id: u32, max_speed: i64, ) -> Result<(), ErrorCode>214     pub(crate) fn set_max_speed(
215         &mut self,
216         uid: u64,
217         task_id: u32,
218         max_speed: i64,
219     ) -> Result<(), ErrorCode> {
220         if let Some(task) = self.running_queue.get_task(uid, task_id) {
221             task.max_speed.store(max_speed, Ordering::SeqCst);
222         }
223         Ok(())
224     }
225 
task_set_mode( &mut self, uid: u64, task_id: u32, mode: Mode, ) -> Result<(), ErrorCode>226     pub(crate) fn task_set_mode(
227         &mut self,
228         uid: u64,
229         task_id: u32,
230         mode: Mode,
231     ) -> Result<(), ErrorCode> {
232         let database = RequestDb::get_instance();
233         database.set_mode(task_id, mode)?;
234 
235         if self.qos.task_set_mode(uid, task_id, mode) {
236             self.schedule_if_not_scheduled();
237         }
238         if let Some(task) = self.running_queue.get_task_clone(uid, task_id) {
239             task.mode.store(mode.repr, Ordering::Release);
240         }
241         if mode == Mode::FrontEnd {
242             NotificationDispatcher::get_instance().unregister_task(uid, task_id, false);
243         } else if mode == Mode::BackGround {
244             NotificationDispatcher::get_instance().enable_task_progress_notification(task_id);
245         }
246         Ok(())
247     }
248 
task_completed(&mut self, uid: u64, task_id: u32)249     pub(crate) fn task_completed(&mut self, uid: u64, task_id: u32) {
250         info!("scheduler task {} completed", task_id);
251         self.running_queue.task_finish(uid, task_id);
252 
253         let database = RequestDb::get_instance();
254         if self.qos.remove_task(uid, task_id) {
255             self.schedule_if_not_scheduled();
256         }
257 
258         if let Some(info) = database.get_task_qos_info(task_id) {
259             if info.state == State::Failed.repr {
260                 if let Some(task_info) = database.get_task_info(task_id) {
261                     Scheduler::notify_fail(task_info, &self.client_manager, Reason::Default);
262                     return;
263                 }
264             }
265 
266             if info.state != State::Running.repr && info.state != State::Waiting.repr {
267                 return;
268             }
269         }
270 
271         database.update_task_state(task_id, State::Completed, Reason::Default);
272         database.remove_user_file_task(task_id);
273         if let Some(info) = database.get_task_info(task_id) {
274             Notifier::complete(&self.client_manager, info.build_notify_data());
275             NotificationDispatcher::get_instance().publish_success_notification(&info);
276         }
277     }
278 
task_cancel( &mut self, uid: u64, task_id: u32, mode: Mode, task_count: &mut HashMap<u64, (usize, usize)>, )279     pub(crate) fn task_cancel(
280         &mut self,
281         uid: u64,
282         task_id: u32,
283         mode: Mode,
284         task_count: &mut HashMap<u64, (usize, usize)>,
285     ) {
286         info!("scheduler task {} canceled", task_id);
287         self.running_queue.task_finish(uid, task_id);
288         if self.running_queue.try_restart(uid, task_id) {
289             return;
290         }
291 
292         let database = RequestDb::get_instance();
293         let Some(info) = database.get_task_info(task_id) else {
294             error!("task {} not found in database", task_id);
295             NotificationDispatcher::get_instance().unregister_task(uid, task_id, true);
296             return;
297         };
298         match State::from(info.progress.common_data.state) {
299             State::Running | State::Retrying => {
300                 info!("task {} waiting for task limits", task_id);
301                 RequestDb::get_instance().update_task_state(
302                     task_id,
303                     State::Waiting,
304                     Reason::RunningTaskMeetLimits,
305                 );
306                 Notifier::waiting(&self.client_manager, task_id, WaitingCause::TaskQueue);
307             }
308             State::Failed => {
309                 info!("task {} cancel with state Failed", task_id);
310                 Scheduler::reduce_task_count(uid, mode, task_count);
311                 let reason = info.common_data.reason;
312                 Scheduler::notify_fail(info, &self.client_manager, Reason::from(reason));
313             }
314             State::Stopped | State::Removed => {
315                 info!("task {} cancel with state Stopped or Removed", task_id);
316                 NotificationDispatcher::get_instance().unregister_task(uid, task_id, true);
317                 self.running_queue.try_restart(uid, task_id);
318             }
319             State::Waiting => {
320                 info!("task {} cancel with state Waiting", task_id);
321                 let reason = match info.common_data.reason {
322                     reason if reason == Reason::AppBackgroundOrTerminate.repr => {
323                         WaitingCause::AppState
324                     }
325                     reason
326                         if reason == Reason::NetworkOffline.repr
327                             || reason == Reason::UnsupportedNetworkType.repr =>
328                     {
329                         WaitingCause::Network
330                     }
331                     reason if reason == Reason::RunningTaskMeetLimits.repr => {
332                         WaitingCause::TaskQueue
333                     }
334                     reason if reason == Reason::AccountStopped.repr => WaitingCause::UserState,
335                     reason => {
336                         error!("task {} cancel with other reason {}", task_id, reason);
337                         WaitingCause::TaskQueue
338                     }
339                 };
340                 Notifier::waiting(&self.client_manager, task_id, reason);
341             }
342             state => {
343                 info!(
344                     "task {} cancel state {:?} reason {:?}",
345                     task_id,
346                     state,
347                     Reason::from(info.common_data.reason)
348                 );
349             }
350         }
351     }
352 
task_failed(&mut self, uid: u64, task_id: u32, reason: Reason)353     pub(crate) fn task_failed(&mut self, uid: u64, task_id: u32, reason: Reason) {
354         info!("scheduler task {} failed", task_id);
355         self.running_queue.task_finish(uid, task_id);
356 
357         let database = RequestDb::get_instance();
358 
359         if self.qos.remove_task(uid, task_id) {
360             self.schedule_if_not_scheduled();
361         }
362 
363         if let Some(info) = database.get_task_qos_info(task_id) {
364             if info.state != State::Running.repr && info.state != State::Waiting.repr {
365                 return;
366             }
367         }
368 
369         database.update_task_state(task_id, State::Failed, reason);
370         if let Some(info) = database.get_task_info(task_id) {
371             let reason = info.common_data.reason;
372             Scheduler::notify_fail(info, &self.client_manager, Reason::from(reason));
373         }
374     }
375 
notify_fail(info: TaskInfo, client_manager: &ClientManagerEntry, reason: Reason)376     fn notify_fail(info: TaskInfo, client_manager: &ClientManagerEntry, reason: Reason) {
377         Notifier::fail(client_manager, info.build_notify_data());
378         Notifier::faults(info.common_data.task_id, client_manager, reason);
379         NotificationDispatcher::get_instance().publish_failed_notification(&info);
380         #[cfg(feature = "oh")]
381         Self::sys_event(info);
382     }
383 
reduce_task_count( uid: u64, mode: Mode, task_count: &mut HashMap<u64, (usize, usize)>, )384     pub(crate) fn reduce_task_count(
385         uid: u64,
386         mode: Mode,
387         task_count: &mut HashMap<u64, (usize, usize)>,
388     ) {
389         if let Some((front, back)) = task_count.get_mut(&uid) {
390             match mode {
391                 Mode::FrontEnd => {
392                     if *front > 0 {
393                         *front -= 1;
394                     }
395                 }
396                 _ => {
397                     if *back > 0 {
398                         *back -= 1;
399                     }
400                 }
401             }
402         }
403     }
404 
405     #[cfg(feature = "oh")]
sys_event(info: TaskInfo)406     pub(crate) fn sys_event(info: TaskInfo) {
407         use crate::sys_event::sys_task_fault;
408 
409         let index = info.progress.common_data.index;
410         let size = info.file_specs.len();
411         let action = match info.action() {
412             Action::Download => "DOWNLOAD",
413             Action::Upload => "UPLOAD",
414             _ => "UNKNOWN",
415         };
416         let reason = Reason::from(info.common_data.reason);
417 
418         sys_task_fault(
419             action,
420             size as i32,
421             (size - index) as i32,
422             index as i32,
423             reason.repr as i32,
424         );
425     }
426 
on_state_change<T, F>(&mut self, f: F, t: T) where F: FnOnce(&mut state::Handler, T) -> Option<SqlList>,427     pub(crate) fn on_state_change<T, F>(&mut self, f: F, t: T)
428     where
429         F: FnOnce(&mut state::Handler, T) -> Option<SqlList>,
430     {
431         let Some(sql_list) = f(&mut self.state_handler, t) else {
432             return;
433         };
434         let db = RequestDb::get_instance();
435         for sql in sql_list {
436             if let Err(e) = db.execute(&sql) {
437                 error!("TaskManager update network failed {:?}", e);
438             };
439         }
440         self.reload_all_tasks();
441     }
442 
reload_all_tasks(&mut self)443     pub(crate) fn reload_all_tasks(&mut self) {
444         self.qos.reload_all_tasks();
445         self.schedule_if_not_scheduled();
446     }
447 
on_rss_change(&mut self, level: i32)448     pub(crate) fn on_rss_change(&mut self, level: i32) {
449         if let Some(new_rss) = self.state_handler.update_rss_level(level) {
450             self.qos.change_rss(new_rss);
451             self.schedule_if_not_scheduled();
452         }
453     }
454 
schedule_if_not_scheduled(&mut self)455     fn schedule_if_not_scheduled(&mut self) {
456         if self.resort_scheduled {
457             return;
458         }
459         self.resort_scheduled = true;
460         let task_manager = self.task_manager.clone();
461         task_manager.send_event(TaskManagerEvent::Reschedule);
462     }
463 
reschedule(&mut self)464     pub(crate) fn reschedule(&mut self) {
465         self.resort_scheduled = false;
466         let changes = self.qos.reschedule(&self.state_handler);
467         let mut qos_remove_queue = vec![];
468         self.running_queue
469             .reschedule(changes, &mut qos_remove_queue);
470         for (uid, task_id) in qos_remove_queue.iter() {
471             self.qos.apps.remove_task(*uid, *task_id);
472         }
473         if !qos_remove_queue.is_empty() {
474             self.reload_all_tasks();
475         }
476     }
477 
check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode>478     pub(crate) fn check_config_satisfy(&self, task_id: u32) -> Result<bool, ErrorCode> {
479         let database = RequestDb::get_instance();
480         let config = database
481             .get_task_config(task_id)
482             .ok_or(ErrorCode::TaskNotFound)?;
483 
484         if let Err(reason) = config.satisfy_network(self.state_handler.network()) {
485             info!(
486                 "task {} started, waiting for network {:?}",
487                 task_id,
488                 self.state_handler.network()
489             );
490 
491             database.update_task_state(task_id, State::Waiting, reason);
492             Notifier::waiting(&self.client_manager, task_id, WaitingCause::Network);
493             return Ok(false);
494         }
495 
496         if !config.satisfy_foreground(self.state_handler.foreground_abilities()) {
497             info!(
498                 "task {} started, waiting for app {}",
499                 task_id, config.common_data.uid
500             );
501             database.update_task_state(task_id, State::Waiting, Reason::AppBackgroundOrTerminate);
502             Notifier::waiting(&self.client_manager, task_id, WaitingCause::AppState);
503             return Ok(false);
504         }
505         Ok(true)
506     }
507 
clear_timeout_tasks(&mut self)508     pub(crate) fn clear_timeout_tasks(&mut self) {
509         let current_time = get_current_timestamp();
510         let timeout_tasks = self
511             .tasks()
512             .filter(|task| current_time - task.ctime > MILLISECONDS_IN_ONE_MONTH)
513             .cloned()
514             .collect::<Vec<_>>();
515         if timeout_tasks.is_empty() {
516             return;
517         }
518         let database = RequestDb::get_instance();
519         for task in timeout_tasks {
520             if database
521                 .change_status(task.task_id(), State::Stopped)
522                 .is_ok()
523             {
524                 self.qos.apps.remove_task(task.uid(), task.task_id());
525             }
526         }
527         self.schedule_if_not_scheduled();
528     }
529 
retry_all_tasks(&mut self)530     pub(crate) fn retry_all_tasks(&mut self) {
531         self.running_queue.retry_all_tasks();
532     }
533 }
534 
535 impl RequestDb {
change_status(&self, task_id: u32, new_state: State) -> Result<(), ErrorCode>536     fn change_status(&self, task_id: u32, new_state: State) -> Result<(), ErrorCode> {
537         let info = RequestDb::get_instance()
538             .get_task_info(task_id)
539             .ok_or(ErrorCode::TaskNotFound)?;
540 
541         let old_state = info.progress.common_data.state;
542         if old_state == new_state.repr {
543             if new_state == State::Removed {
544                 return Err(ErrorCode::TaskNotFound);
545             } else {
546                 return Err(ErrorCode::TaskStateErr);
547             }
548         }
549         let sql = match new_state {
550             State::Paused => sql::pause_task(task_id),
551             State::Running => sql::start_task(task_id),
552             State::Stopped => sql::stop_task(task_id),
553             State::Removed => sql::remove_task(task_id),
554             State::Waiting => sql::start_task(task_id),
555             _ => return Err(ErrorCode::Other),
556         };
557 
558         RequestDb::get_instance()
559             .execute(&sql)
560             .map_err(|_| ErrorCode::SystemApi)?;
561 
562         let info = RequestDb::get_instance()
563             .get_task_info(task_id)
564             .ok_or(ErrorCode::SystemApi)?;
565         if info.progress.common_data.state != new_state.repr {
566             return Err(ErrorCode::TaskStateErr);
567         }
568 
569         if (old_state == State::Initialized.repr
570             || old_state == State::Waiting.repr
571             || old_state == State::Paused.repr)
572             && (new_state == State::Stopped || new_state == State::Removed)
573         {
574             NotificationDispatcher::get_instance().unregister_task(info.uid(), task_id, true);
575         }
576         Ok(())
577     }
578 
set_mode(&self, task_id: u32, mode: Mode) -> Result<(), ErrorCode>579     fn set_mode(&self, task_id: u32, mode: Mode) -> Result<(), ErrorCode> {
580         let info = RequestDb::get_instance()
581             .get_task_info(task_id)
582             .ok_or(ErrorCode::TaskNotFound)?;
583         let old_mode = info.common_data.mode;
584         if old_mode == mode.repr {
585             return Ok(());
586         }
587         let sql = sql::task_set_mode(task_id, mode);
588         RequestDb::get_instance()
589             .execute(&sql)
590             .map_err(|_| ErrorCode::SystemApi)?;
591         let info = RequestDb::get_instance()
592             .get_task_info(task_id)
593             .ok_or(ErrorCode::SystemApi)?;
594         if info.common_data.mode != mode.repr {
595             return Err(ErrorCode::TaskStateErr);
596         }
597         Ok(())
598     }
599 }
600