• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::HashMap;
15 use std::ops::{Deref, DerefMut};
16 use std::time::Duration;
17 
18 use samgr::definition::COMM_NET_CONN_MANAGER_SYS_ABILITY_ID;
19 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
20 use ylong_runtime::sync::oneshot;
21 use ylong_runtime::time::sleep;
22 
23 cfg_oh! {
24     use samgr::manage::SystemAbilityManager;
25     use crate::ability::PANIC_INFO;
26     use crate::manage::account::registry_account_subscribe;
27 }
28 use super::account::{remove_account_tasks, AccountEvent};
29 use super::database::RequestDb;
30 use super::events::{
31     QueryEvent, ScheduleEvent, ServiceEvent, StateEvent, TaskEvent, TaskManagerEvent,
32 };
33 use crate::config::Action;
34 use crate::error::ErrorCode;
35 use crate::info::TaskInfo;
36 use crate::manage::network::register_network_change;
37 use crate::manage::network_manager::NetworkManager;
38 use crate::manage::scheduler::state::Handler;
39 use crate::manage::scheduler::Scheduler;
40 use crate::service::client::ClientManagerEntry;
41 use crate::service::notification_bar::subscribe_notification_bar;
42 use crate::service::run_count::RunCountManagerEntry;
43 use crate::utils::runtime_spawn;
44 
45 const CLEAR_INTERVAL: u64 = 30 * 60;
46 const LOG_INTERVAL: u64 = 5 * 60;
47 const RESTORE_ALL_TASKS_INTERVAL: u64 = 10;
48 
49 // TaskManager 的初始化逻辑:
50 //
51 // 首先确定任务的来源:1)来自应用的任务 2)数据库中未完成的任务。
52 // 其次确定 SA 拉起的时机:1)WIFI 连接拉起 SA 2)应用拉起 SA
53 
54 // Qos schedule 逻辑步骤:
55 // 1. SA 启动时,从数据库中将存在 Waiting + QosWaiting 的任务(Qos
56 //    信息)及应用信息取出,存放到 Qos 结构中排序,此时触发一次初始的任务加载。
57 // 2. 当新任务添加到 SA 侧\网络状态变化\前后台状态变化时,更新并排序
58 //    Qos,触发任务加载,把可执行任务加载到内存中处理,
59 //    或是把不可执行任务返回数据库中。
60 
61 pub(crate) struct TaskManager {
62     pub(crate) scheduler: Scheduler,
63     pub(crate) rx: TaskManagerRx,
64     pub(crate) client_manager: ClientManagerEntry,
65     // first usize for foreground , seconde for background
66     pub(crate) task_count: HashMap<u64, (usize, usize)>,
67 }
68 
69 impl TaskManager {
init( runcount_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, #[cfg(not(feature = "oh"))] network: Network, ) -> TaskManagerTx70     pub(crate) fn init(
71         runcount_manager: RunCountManagerEntry,
72         client_manager: ClientManagerEntry,
73         #[cfg(not(feature = "oh"))] network: Network,
74     ) -> TaskManagerTx {
75         debug!("TaskManager init");
76 
77         let (tx, rx) = unbounded_channel();
78         let tx = TaskManagerTx::new(tx);
79         let rx = TaskManagerRx::new(rx);
80 
81         #[cfg(feature = "oh")]
82         registry_account_subscribe(tx.clone());
83 
84         #[cfg(feature = "oh")]
85         {
86             let mut network_manager = NetworkManager::get_instance().lock().unwrap();
87             network_manager.tx = Some(tx.clone());
88             SystemAbilityManager::subscribe_system_ability(
89                 COMM_NET_CONN_MANAGER_SYS_ABILITY_ID,
90                 |_, _| {
91                     register_network_change();
92                 },
93                 |_, _| {
94                     info!("network service died");
95                 },
96             );
97         }
98         #[cfg(feature = "oh")]
99         register_network_change();
100         subscribe_notification_bar(tx.clone());
101         let task_manager = Self::new(tx.clone(), rx, runcount_manager, client_manager);
102 
103         // Performance optimization tips for task restoring:
104         //
105         // When SA is initializing, it will create and initialize an app sorting
106         // queue in `scheduler.QoS`, but there is no task rescheduling or
107         // execution at this time.
108         //
109         // After SA initialization, we will start a coroutine to recover all
110         // tasks, which is used to notify `TaskManager` to recover waiting tasks
111         // in the database.
112         //
113         // If a new task is started at this time, this future can
114         // be removed because the scheduler will also be rearranged in the
115         // startup logic of the new task.
116         runtime_spawn(restore_all_tasks(tx.clone()));
117 
118         runtime_spawn(clear_timeout_tasks(tx.clone()));
119         runtime_spawn(log_all_task_info(tx.clone()));
120         runtime_spawn(task_manager.run());
121         tx
122     }
123 
new( tx: TaskManagerTx, rx: TaskManagerRx, run_count_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, ) -> Self124     pub(crate) fn new(
125         tx: TaskManagerTx,
126         rx: TaskManagerRx,
127         run_count_manager: RunCountManagerEntry,
128         client_manager: ClientManagerEntry,
129     ) -> Self {
130         Self {
131             scheduler: Scheduler::init(tx.clone(), run_count_manager, client_manager.clone()),
132             rx,
133             client_manager,
134             task_count: HashMap::new(),
135         }
136     }
137 
run(mut self)138     async fn run(mut self) {
139         let db = RequestDb::get_instance();
140         db.clear_invalid_records();
141         loop {
142             let event = match self.rx.recv().await {
143                 Ok(event) => event,
144                 Err(e) => {
145                     error!("TaskManager receives error {:?}", e);
146                     continue;
147                 }
148             };
149 
150             match event {
151                 TaskManagerEvent::Service(event) => self.handle_service_event(event),
152                 TaskManagerEvent::State(event) => self.handle_state_event(event),
153                 TaskManagerEvent::Task(event) => self.handle_task_event(event),
154                 TaskManagerEvent::Schedule(event) => {
155                     if self.handle_schedule_event(event) {
156                         info!("TaskManager unload ok");
157                         // If unload_sa success, can not breaks this loop.
158                     }
159                 }
160                 TaskManagerEvent::Device(level) => {
161                     self.scheduler.on_rss_change(level);
162                 }
163                 TaskManagerEvent::Account(event) => self.handle_account_event(event),
164                 TaskManagerEvent::Query(query) => self.handle_query_event(query),
165                 TaskManagerEvent::Reschedule => self.scheduler.reschedule(),
166             }
167 
168             debug!("TaskManager handles events finished");
169         }
170     }
171 
handle_account_event(&mut self, event: AccountEvent)172     pub(crate) fn handle_account_event(&mut self, event: AccountEvent) {
173         match event {
174             AccountEvent::Remove(user_id) => remove_account_tasks(user_id),
175             AccountEvent::Changed => self.scheduler.on_state_change(Handler::update_account, ()),
176         }
177     }
178 
handle_service_event(&mut self, event: ServiceEvent)179     fn handle_service_event(&mut self, event: ServiceEvent) {
180         debug!("TaskManager handles service event {:?}", event);
181 
182         match event {
183             ServiceEvent::Construct(msg, tx) => {
184                 let _ = tx.send(self.create(msg.config));
185             }
186             ServiceEvent::Start(uid, task_id, tx) => {
187                 let _ = tx.send(self.start(uid, task_id));
188             }
189             ServiceEvent::Stop(uid, task_id, tx) => {
190                 let _ = tx.send(self.stop(uid, task_id));
191             }
192             ServiceEvent::Pause(uid, task_id, tx) => {
193                 let _ = tx.send(self.pause(uid, task_id));
194             }
195             ServiceEvent::Resume(uid, task_id, tx) => {
196                 let _ = tx.send(self.resume(uid, task_id));
197             }
198             ServiceEvent::Remove(uid, task_id, tx) => {
199                 let _ = tx.send(self.remove(uid, task_id));
200             }
201             ServiceEvent::DumpAll(tx) => {
202                 let _ = tx.send(self.query_all_task());
203             }
204             ServiceEvent::DumpOne(task_id, tx) => {
205                 let _ = tx.send(self.query_one_task(task_id));
206             }
207             ServiceEvent::AttachGroup(uid, task_ids, group, tx) => {
208                 let _ = tx.send(self.attach_group(uid, task_ids, group));
209             }
210         }
211     }
212 
handle_state_event(&mut self, event: StateEvent)213     fn handle_state_event(&mut self, event: StateEvent) {
214         debug!("TaskManager handles state event {:?}", event);
215 
216         match event {
217             StateEvent::Network => {
218                 self.scheduler.retry_all_tasks();
219                 self.scheduler.on_state_change(Handler::update_network, ());
220             }
221 
222             StateEvent::ForegroundApp(uid) => {
223                 self.scheduler.on_state_change(Handler::update_top_uid, uid);
224             }
225             StateEvent::Background(uid) => self
226                 .scheduler
227                 .on_state_change(Handler::update_background, uid),
228             StateEvent::BackgroundTimeout(uid) => self
229                 .scheduler
230                 .on_state_change(Handler::update_background_timeout, uid),
231             StateEvent::SpecialTerminate(uid) => {
232                 self.scheduler
233                     .on_state_change(Handler::special_process_terminate, uid);
234             }
235         }
236     }
237 
handle_task_event(&mut self, event: TaskEvent)238     fn handle_task_event(&mut self, event: TaskEvent) {
239         debug!("TaskManager handles task event {:?}", event);
240 
241         match event {
242             TaskEvent::Subscribe(task_id, token_id, tx) => {
243                 let _ = tx.send(self.check_subscriber(task_id, token_id));
244             }
245             TaskEvent::Completed(task_id, uid, mode) => {
246                 Scheduler::reduce_task_count(uid, mode, &mut self.task_count);
247                 self.scheduler.task_completed(uid, task_id);
248             }
249             TaskEvent::Running(task_id, uid, mode) => {
250                 self.scheduler
251                     .task_cancel(uid, task_id, mode, &mut self.task_count);
252             }
253             TaskEvent::Failed(task_id, uid, reason, mode) => {
254                 Scheduler::reduce_task_count(uid, mode, &mut self.task_count);
255                 self.scheduler.task_failed(uid, task_id, reason);
256             }
257             TaskEvent::Offline(task_id, uid, mode) => {
258                 self.scheduler
259                     .task_cancel(uid, task_id, mode, &mut self.task_count);
260             }
261         };
262     }
263 
handle_schedule_event(&mut self, message: ScheduleEvent) -> bool264     fn handle_schedule_event(&mut self, message: ScheduleEvent) -> bool {
265         debug!("TaskManager handle scheduled_message {:?}", message);
266 
267         match message {
268             ScheduleEvent::ClearTimeoutTasks => self.clear_timeout_tasks(),
269             ScheduleEvent::LogTasks => self.log_all_task_info(),
270             ScheduleEvent::RestoreAllTasks => self.restore_all_tasks(),
271             ScheduleEvent::Unload => return self.unload_sa(),
272         }
273         false
274     }
275 
check_subscriber(&self, task_id: u32, token_id: u64) -> ErrorCode276     fn check_subscriber(&self, task_id: u32, token_id: u64) -> ErrorCode {
277         match RequestDb::get_instance().query_task_token_id(task_id) {
278             Ok(id) if id == token_id => ErrorCode::ErrOk,
279             Ok(_) => ErrorCode::Permission,
280             Err(_) => ErrorCode::TaskNotFound,
281         }
282     }
283 
clear_timeout_tasks(&mut self)284     fn clear_timeout_tasks(&mut self) {
285         self.scheduler.clear_timeout_tasks();
286     }
287 
log_all_task_info(&self)288     fn log_all_task_info(&self) {
289         self.scheduler.dump_tasks();
290     }
291 
restore_all_tasks(&mut self)292     fn restore_all_tasks(&mut self) {
293         self.scheduler.restore_all_tasks();
294     }
295 
unload_sa(&mut self) -> bool296     fn unload_sa(&mut self) -> bool {
297         const REQUEST_SERVICE_ID: i32 = 3706;
298 
299         if !self.rx.is_empty() {
300             return false;
301         }
302 
303         let running_tasks = self.scheduler.running_tasks();
304         if running_tasks != 0 {
305             info!("running {} tasks when unload SA", running_tasks,);
306             return false;
307         }
308 
309         // check rx again for there may be new message arrive.
310         if !self.rx.is_empty() {
311             return false;
312         }
313 
314         info!("unload SA");
315 
316         // failed logic?
317         #[cfg(feature = "oh")]
318         let _ = SystemAbilityManager::unload_system_ability(REQUEST_SERVICE_ID);
319 
320         true
321     }
322 }
323 
324 #[allow(unreachable_pub)]
325 #[derive(Clone)]
326 pub struct TaskManagerTx {
327     pub(crate) tx: UnboundedSender<TaskManagerEvent>,
328 }
329 
330 impl TaskManagerTx {
new(tx: UnboundedSender<TaskManagerEvent>) -> Self331     pub(crate) fn new(tx: UnboundedSender<TaskManagerEvent>) -> Self {
332         Self { tx }
333     }
334 
send_event(&self, event: TaskManagerEvent) -> bool335     pub(crate) fn send_event(&self, event: TaskManagerEvent) -> bool {
336         if self.tx.send(event).is_err() {
337             #[cfg(feature = "oh")]
338             unsafe {
339                 if let Some(e) = PANIC_INFO.as_ref() {
340                     error!("Sends TaskManager event failed {}", e);
341                 } else {
342                     info!("TaskManager is unloading");
343                 }
344             }
345             return false;
346         }
347         true
348     }
349 
notify_foreground_app_change(&self, uid: u64)350     pub(crate) fn notify_foreground_app_change(&self, uid: u64) {
351         let _ = self.send_event(TaskManagerEvent::State(StateEvent::ForegroundApp(uid)));
352     }
353 
notify_app_background(&self, uid: u64)354     pub(crate) fn notify_app_background(&self, uid: u64) {
355         let _ = self.send_event(TaskManagerEvent::State(StateEvent::Background(uid)));
356     }
357 
trigger_background_timeout(&self, uid: u64)358     pub(crate) fn trigger_background_timeout(&self, uid: u64) {
359         let _ = self.send_event(TaskManagerEvent::State(StateEvent::BackgroundTimeout(uid)));
360     }
361 
notify_special_process_terminate(&self, uid: u64)362     pub(crate) fn notify_special_process_terminate(&self, uid: u64) {
363         let _ = self.send_event(TaskManagerEvent::State(StateEvent::SpecialTerminate(uid)));
364     }
365 
show(&self, uid: u64, task_id: u32) -> Option<TaskInfo>366     pub(crate) fn show(&self, uid: u64, task_id: u32) -> Option<TaskInfo> {
367         let (tx, rx) = oneshot::channel();
368         let event = QueryEvent::Show(task_id, uid, tx);
369         let _ = self.send_event(TaskManagerEvent::Query(event));
370         ylong_runtime::block_on(rx).unwrap()
371     }
372 
query(&self, task_id: u32, action: Action) -> Option<TaskInfo>373     pub(crate) fn query(&self, task_id: u32, action: Action) -> Option<TaskInfo> {
374         let (tx, rx) = oneshot::channel();
375         let event = QueryEvent::Query(task_id, action, tx);
376         let _ = self.send_event(TaskManagerEvent::Query(event));
377         ylong_runtime::block_on(rx).unwrap()
378     }
379 
touch(&self, uid: u64, task_id: u32, token: String) -> Option<TaskInfo>380     pub(crate) fn touch(&self, uid: u64, task_id: u32, token: String) -> Option<TaskInfo> {
381         let (tx, rx) = oneshot::channel();
382         let event = QueryEvent::Touch(task_id, uid, token, tx);
383         let _ = self.send_event(TaskManagerEvent::Query(event));
384         ylong_runtime::block_on(rx).unwrap()
385     }
386 }
387 
388 pub(crate) struct TaskManagerRx {
389     rx: UnboundedReceiver<TaskManagerEvent>,
390 }
391 
392 impl TaskManagerRx {
new(rx: UnboundedReceiver<TaskManagerEvent>) -> Self393     pub(crate) fn new(rx: UnboundedReceiver<TaskManagerEvent>) -> Self {
394         Self { rx }
395     }
396 }
397 
398 impl Deref for TaskManagerRx {
399     type Target = UnboundedReceiver<TaskManagerEvent>;
400 
deref(&self) -> &Self::Target401     fn deref(&self) -> &Self::Target {
402         &self.rx
403     }
404 }
405 
406 impl DerefMut for TaskManagerRx {
deref_mut(&mut self) -> &mut Self::Target407     fn deref_mut(&mut self) -> &mut Self::Target {
408         &mut self.rx
409     }
410 }
411 
restore_all_tasks(tx: TaskManagerTx)412 async fn restore_all_tasks(tx: TaskManagerTx) {
413     sleep(Duration::from_secs(RESTORE_ALL_TASKS_INTERVAL)).await;
414     let _ = tx.send_event(TaskManagerEvent::Schedule(ScheduleEvent::RestoreAllTasks));
415 }
416 
clear_timeout_tasks(tx: TaskManagerTx)417 async fn clear_timeout_tasks(tx: TaskManagerTx) {
418     loop {
419         sleep(Duration::from_secs(CLEAR_INTERVAL)).await;
420         let _ = tx.send_event(TaskManagerEvent::Schedule(ScheduleEvent::ClearTimeoutTasks));
421     }
422 }
423 
log_all_task_info(tx: TaskManagerTx)424 async fn log_all_task_info(tx: TaskManagerTx) {
425     loop {
426         sleep(Duration::from_secs(LOG_INTERVAL)).await;
427         let _ = tx.send_event(TaskManagerEvent::Schedule(ScheduleEvent::LogTasks));
428     }
429 }
430