• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 //     http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13 
14 use std::collections::HashMap;
15 use std::ops::{Deref, DerefMut};
16 use std::time::Duration;
17 
18 use samgr::definition::COMM_NET_CONN_MANAGER_SYS_ABILITY_ID;
19 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
20 use ylong_runtime::sync::oneshot;
21 use ylong_runtime::time::sleep;
22 
23 cfg_oh! {
24     use samgr::manage::SystemAbilityManager;
25     use crate::ability::PANIC_INFO;
26     use crate::manage::account::registry_account_subscribe;
27 }
28 use super::account::{remove_account_tasks, AccountEvent};
29 use super::database::RequestDb;
30 use super::events::{
31     QueryEvent, ScheduleEvent, ServiceEvent, StateEvent, TaskEvent, TaskManagerEvent,
32 };
33 use crate::config::{Action, Mode};
34 use crate::database::clear_database_part;
35 use crate::error::ErrorCode;
36 use crate::info::{State, TaskInfo};
37 use crate::manage::app_state::AppUninstallSubscriber;
38 use crate::manage::network::register_network_change;
39 use crate::manage::network_manager::NetworkManager;
40 use crate::manage::query::TaskFilter;
41 use crate::manage::scheduler::state::Handler;
42 use crate::manage::scheduler::Scheduler;
43 use crate::service::active_counter::ActiveCounter;
44 use crate::service::client::ClientManagerEntry;
45 use crate::service::notification_bar::{subscribe_notification_bar, NotificationDispatcher};
46 use crate::service::run_count::RunCountManagerEntry;
47 use crate::utils::task_event_count::{task_complete_add, task_fail_add, task_unload};
48 use crate::utils::{get_current_timestamp, runtime_spawn, subscribe_common_event, update_policy};
49 
50 const CLEAR_INTERVAL: u64 = 30 * 60;
51 const RESTORE_ALL_TASKS_INTERVAL: u64 = 10;
52 
53 // TaskManager 的初始化逻辑:
54 //
55 // 首先确定任务的来源:1)来自应用的任务 2)数据库中未完成的任务。
56 // 其次确定 SA 拉起的时机:1)WIFI 连接拉起 SA 2)应用拉起 SA
57 
58 // Qos schedule 逻辑步骤:
59 // 1. SA 启动时,从数据库中将存在 Waiting + QosWaiting 的任务(Qos
60 //    信息)及应用信息取出,存放到 Qos 结构中排序,此时触发一次初始的任务加载。
61 // 2. 当新任务添加到 SA 侧\网络状态变化\前后台状态变化时,更新并排序
62 //    Qos,触发任务加载,把可执行任务加载到内存中处理,
63 //    或是把不可执行任务返回数据库中。
64 
65 pub(crate) struct TaskManager {
66     pub(crate) scheduler: Scheduler,
67     pub(crate) rx: TaskManagerRx,
68     pub(crate) client_manager: ClientManagerEntry,
69     // first usize for foreground , seconde for background
70     pub(crate) task_count: HashMap<u64, (usize, usize)>,
71 }
72 
73 impl TaskManager {
init( runcount_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, active_counter: ActiveCounter, #[cfg(not(feature = "oh"))] network: Network, ) -> TaskManagerTx74     pub(crate) fn init(
75         runcount_manager: RunCountManagerEntry,
76         client_manager: ClientManagerEntry,
77         active_counter: ActiveCounter,
78         #[cfg(not(feature = "oh"))] network: Network,
79     ) -> TaskManagerTx {
80         debug!("TaskManager init");
81 
82         let (tx, rx) = unbounded_channel();
83         let tx = TaskManagerTx::new(tx);
84         let rx = TaskManagerRx::new(rx);
85 
86         #[cfg(feature = "oh")]
87         registry_account_subscribe(tx.clone());
88 
89         #[cfg(feature = "oh")]
90         {
91             let mut network_manager = NetworkManager::get_instance().lock().unwrap();
92             network_manager.tx = Some(tx.clone());
93             SystemAbilityManager::subscribe_system_ability(
94                 COMM_NET_CONN_MANAGER_SYS_ABILITY_ID,
95                 |_, _| {
96                     register_network_change();
97                 },
98                 |_, _| {
99                     info!("network service died");
100                 },
101             );
102         }
103         #[cfg(feature = "oh")]
104         register_network_change();
105         subscribe_notification_bar(tx.clone());
106 
107         if let Err(e) = subscribe_common_event(
108             vec![
109                 "usual.event.PACKAGE_REMOVED",
110                 "usual.event.BUNDLE_REMOVED",
111                 "usual.event.PACKAGE_FULLY_REMOVED",
112             ],
113             AppUninstallSubscriber::new(tx.clone()),
114         ) {
115             error!("Subscribe app uninstall event failed: {}", e);
116             sys_event!(
117                 ExecFault,
118                 DfxCode::EVENT_FAULT_01,
119                 &format!("Subscribe app uninstall event failed: {}", e)
120             );
121         }
122 
123         let task_manager = Self::new(
124             tx.clone(),
125             rx,
126             runcount_manager,
127             client_manager,
128             active_counter,
129         );
130 
131         // Performance optimization tips for task restoring:
132         //
133         // When SA is initializing, it will create and initialize an app sorting
134         // queue in `scheduler.QoS`, but there is no task rescheduling or
135         // execution at this time.
136         //
137         // After SA initialization, we will start a coroutine to recover all
138         // tasks, which is used to notify `TaskManager` to recover waiting tasks
139         // in the database.
140         //
141         // If a new task is started at this time, this future can
142         // be removed because the scheduler will also be rearranged in the
143         // startup logic of the new task.
144         runtime_spawn(restore_all_tasks(tx.clone()));
145 
146         runtime_spawn(clear_timeout_tasks(tx.clone()));
147         runtime_spawn(task_manager.run());
148         tx
149     }
150 
new( tx: TaskManagerTx, rx: TaskManagerRx, run_count_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, active_counter: ActiveCounter, ) -> Self151     pub(crate) fn new(
152         tx: TaskManagerTx,
153         rx: TaskManagerRx,
154         run_count_manager: RunCountManagerEntry,
155         client_manager: ClientManagerEntry,
156         active_counter: ActiveCounter,
157     ) -> Self {
158         Self {
159             scheduler: Scheduler::init(
160                 tx.clone(),
161                 run_count_manager,
162                 client_manager.clone(),
163                 active_counter,
164             ),
165             rx,
166             client_manager,
167             task_count: HashMap::new(),
168         }
169     }
170 
run(mut self)171     async fn run(mut self) {
172         let db = RequestDb::get_instance();
173         db.clear_invalid_records();
174         loop {
175             let event = match self.rx.recv().await {
176                 Ok(event) => event,
177                 Err(e) => {
178                     error!("TaskManager receives error {:?}", e);
179                     continue;
180                 }
181             };
182 
183             match event {
184                 TaskManagerEvent::Service(event) => self.handle_service_event(event),
185                 TaskManagerEvent::State(event) => self.handle_state_event(event),
186                 TaskManagerEvent::Task(event) => self.handle_task_event(event),
187                 TaskManagerEvent::Schedule(event) => {
188                     if self.handle_schedule_event(event) {
189                         info!("TaskManager unload ok");
190                         // If unload_sa success, can not breaks this loop.
191                     }
192                 }
193                 TaskManagerEvent::Device(level) => {
194                     self.scheduler.on_rss_change(level);
195                 }
196                 TaskManagerEvent::Account(event) => self.handle_account_event(event),
197                 TaskManagerEvent::Query(query) => self.handle_query_event(query),
198                 TaskManagerEvent::Reschedule => self.scheduler.reschedule(),
199             }
200 
201             debug!("TaskManager handles events finished");
202         }
203     }
204 
handle_account_event(&mut self, event: AccountEvent)205     pub(crate) fn handle_account_event(&mut self, event: AccountEvent) {
206         match event {
207             AccountEvent::Remove(user_id) => remove_account_tasks(user_id),
208             AccountEvent::Changed => self.scheduler.on_state_change(Handler::update_account, ()),
209         }
210     }
211 
handle_service_event(&mut self, event: ServiceEvent)212     fn handle_service_event(&mut self, event: ServiceEvent) {
213         debug!("TaskManager handles service event {:?}", event);
214 
215         match event {
216             ServiceEvent::Construct(msg, tx) => {
217                 let _ = tx.send(self.create(msg.config));
218             }
219             ServiceEvent::Start(uid, task_id, tx) => {
220                 let _ = tx.send(self.start(uid, task_id));
221             }
222             ServiceEvent::Stop(uid, task_id, tx) => {
223                 let _ = tx.send(self.stop(uid, task_id));
224             }
225             ServiceEvent::Pause(uid, task_id, tx) => {
226                 let _ = tx.send(self.pause(uid, task_id));
227             }
228             ServiceEvent::Resume(uid, task_id, tx) => {
229                 let _ = tx.send(self.resume(uid, task_id));
230             }
231             ServiceEvent::Remove(uid, task_id, tx) => {
232                 let _ = tx.send(self.remove(uid, task_id));
233             }
234             ServiceEvent::SetMaxSpeed(uid, task_id, max_speed, tx) => {
235                 let _ = tx.send(self.set_max_speed(uid, task_id, max_speed));
236             }
237             ServiceEvent::DumpAll(tx) => {
238                 let _ = tx.send(self.query_all_task());
239             }
240             ServiceEvent::DumpOne(task_id, tx) => {
241                 let _ = tx.send(self.query_one_task(task_id));
242             }
243             ServiceEvent::AttachGroup(uid, task_ids, group, tx) => {
244                 let _ = tx.send(self.attach_group(uid, task_ids, group));
245             }
246             ServiceEvent::SetMode(uid, task_id, mode, tx) => {
247                 let _ = tx.send(self.set_mode(uid, task_id, mode));
248             }
249         }
250     }
251 
handle_state_event(&mut self, event: StateEvent)252     fn handle_state_event(&mut self, event: StateEvent) {
253         debug!("TaskManager handles state event {:?}", event);
254 
255         match event {
256             StateEvent::Network => {
257                 self.scheduler.retry_all_tasks();
258                 self.scheduler.on_state_change(Handler::update_network, ());
259             }
260 
261             StateEvent::ForegroundApp(uid) => {
262                 self.scheduler.on_state_change(Handler::update_top_uid, uid);
263             }
264             StateEvent::Background(uid) => self
265                 .scheduler
266                 .on_state_change(Handler::update_background, uid),
267             StateEvent::BackgroundTimeout(uid) => self
268                 .scheduler
269                 .on_state_change(Handler::update_background_timeout, uid),
270             StateEvent::AppUninstall(uid) => {
271                 self.scheduler.on_state_change(Handler::app_uninstall, uid);
272             }
273             StateEvent::SpecialTerminate(uid) => {
274                 self.scheduler
275                     .on_state_change(Handler::special_process_terminate, uid);
276             }
277         }
278     }
279 
handle_task_event(&mut self, event: TaskEvent)280     fn handle_task_event(&mut self, event: TaskEvent) {
281         debug!("TaskManager handles task event {:?}", event);
282 
283         match event {
284             TaskEvent::Subscribe(task_id, token_id, tx) => {
285                 let _ = tx.send(self.check_subscriber(task_id, token_id));
286             }
287             TaskEvent::Completed(task_id, uid, mode) => {
288                 Scheduler::reduce_task_count(uid, mode, &mut self.task_count);
289                 task_complete_add();
290                 self.scheduler.task_completed(uid, task_id);
291             }
292             TaskEvent::Running(task_id, uid, mode) => {
293                 self.scheduler
294                     .task_cancel(uid, task_id, mode, &mut self.task_count);
295             }
296             TaskEvent::Failed(task_id, uid, reason, mode) => {
297                 Scheduler::reduce_task_count(uid, mode, &mut self.task_count);
298                 task_fail_add();
299                 self.scheduler.task_failed(uid, task_id, reason);
300             }
301             TaskEvent::Offline(task_id, uid, mode) => {
302                 self.scheduler
303                     .task_cancel(uid, task_id, mode, &mut self.task_count);
304             }
305         };
306     }
307 
handle_schedule_event(&mut self, message: ScheduleEvent) -> bool308     fn handle_schedule_event(&mut self, message: ScheduleEvent) -> bool {
309         debug!("TaskManager handle scheduled_message {:?}", message);
310 
311         match message {
312             ScheduleEvent::ClearTimeoutTasks => self.clear_timeout_tasks(),
313             ScheduleEvent::RestoreAllTasks => self.restore_all_tasks(),
314             ScheduleEvent::Unload => return self.unload_sa(),
315         }
316         false
317     }
318 
check_subscriber(&self, task_id: u32, token_id: u64) -> ErrorCode319     fn check_subscriber(&self, task_id: u32, token_id: u64) -> ErrorCode {
320         match RequestDb::get_instance().query_task_token_id(task_id) {
321             Ok(id) if id == token_id => ErrorCode::ErrOk,
322             Ok(_) => ErrorCode::Permission,
323             Err(_) => ErrorCode::TaskNotFound,
324         }
325     }
326 
clear_timeout_tasks(&mut self)327     fn clear_timeout_tasks(&mut self) {
328         self.scheduler.clear_timeout_tasks();
329     }
330 
restore_all_tasks(&mut self)331     fn restore_all_tasks(&mut self) {
332         self.scheduler.restore_all_tasks();
333     }
334 
check_any_tasks(&self) -> bool335     fn check_any_tasks(&self) -> bool {
336         let running_tasks = self.scheduler.running_tasks();
337         if running_tasks != 0 {
338             info!("running {} tasks when unload SA", running_tasks,);
339             return true;
340         }
341 
342         // check rx again for there may be new message arrive.
343         if !self.rx.is_empty() {
344             return true;
345         }
346         false
347     }
348 
unload_sa(&mut self) -> bool349     fn unload_sa(&mut self) -> bool {
350         if self.check_any_tasks() {
351             return false;
352         }
353 
354         const TIMES: usize = 10;
355         const PRE_COUNT: usize = 1000;
356 
357         for _i in 0..TIMES {
358             let remain = clear_database_part(PRE_COUNT).unwrap_or(false);
359             if self.check_any_tasks() {
360                 return false;
361             }
362             if !remain {
363                 break;
364             }
365         }
366         NotificationDispatcher::get_instance().clear_group_info();
367 
368         const REQUEST_SERVICE_ID: i32 = 3706;
369         const ONE_MONTH: i64 = 30 * 24 * 60 * 60 * 1000;
370 
371         let db = RequestDb::get_instance();
372 
373         let filter = TaskFilter {
374             before: get_current_timestamp() as i64,
375             after: get_current_timestamp() as i64 - ONE_MONTH,
376             state: State::Waiting.repr,
377             action: Action::Any.repr,
378             mode: Mode::Any.repr,
379         };
380 
381         let bundle_name = "*".to_string();
382 
383         let task_ids = db.system_search_task(filter, bundle_name);
384 
385         info!("unload SA");
386         task_unload();
387 
388         let any_tasks = task_ids.is_empty();
389         let update_on_demand_policy = update_policy(any_tasks);
390         if update_on_demand_policy != 0 {
391             info!("Update on demand policy failed");
392         }
393 
394         // failed logic?
395         #[cfg(feature = "oh")]
396         let _ = SystemAbilityManager::unload_system_ability(REQUEST_SERVICE_ID);
397 
398         true
399     }
400 }
401 
402 #[cxx::bridge(namespace = "OHOS::Request")]
403 mod ffi {
404     #[derive(Clone, Debug, Copy)]
405     pub(crate) struct TaskQosInfo {
406         pub(crate) task_id: u32,
407         pub(crate) action: u8,
408         pub(crate) mode: u8,
409         pub(crate) state: u8,
410         pub(crate) priority: u32,
411     }
412 
413     unsafe extern "C++" {
414         include!("system_ability_manager.h");
415         include!("system_ability_on_demand_event.h");
416     }
417 }
418 
419 #[allow(unreachable_pub)]
420 #[derive(Clone)]
421 pub struct TaskManagerTx {
422     pub(crate) tx: UnboundedSender<TaskManagerEvent>,
423 }
424 
425 impl TaskManagerTx {
new(tx: UnboundedSender<TaskManagerEvent>) -> Self426     pub(crate) fn new(tx: UnboundedSender<TaskManagerEvent>) -> Self {
427         Self { tx }
428     }
429 
send_event(&self, event: TaskManagerEvent) -> bool430     pub(crate) fn send_event(&self, event: TaskManagerEvent) -> bool {
431         if self.tx.send(event).is_err() {
432             #[cfg(feature = "oh")]
433             unsafe {
434                 if let Some(e) = PANIC_INFO.as_ref() {
435                     error!("Sends TaskManager event failed {}", e);
436                 } else {
437                     info!("TaskManager is unloading");
438                 }
439             }
440             return false;
441         }
442         true
443     }
444 
notify_foreground_app_change(&self, uid: u64)445     pub(crate) fn notify_foreground_app_change(&self, uid: u64) {
446         let _ = self.send_event(TaskManagerEvent::State(StateEvent::ForegroundApp(uid)));
447     }
448 
notify_app_background(&self, uid: u64)449     pub(crate) fn notify_app_background(&self, uid: u64) {
450         let _ = self.send_event(TaskManagerEvent::State(StateEvent::Background(uid)));
451     }
452 
trigger_background_timeout(&self, uid: u64)453     pub(crate) fn trigger_background_timeout(&self, uid: u64) {
454         let _ = self.send_event(TaskManagerEvent::State(StateEvent::BackgroundTimeout(uid)));
455     }
456 
notify_special_process_terminate(&self, uid: u64)457     pub(crate) fn notify_special_process_terminate(&self, uid: u64) {
458         let _ = self.send_event(TaskManagerEvent::State(StateEvent::SpecialTerminate(uid)));
459     }
460 
show(&self, uid: u64, task_id: u32) -> Option<TaskInfo>461     pub(crate) fn show(&self, uid: u64, task_id: u32) -> Option<TaskInfo> {
462         let (tx, rx) = oneshot::channel();
463         let event = QueryEvent::Show(task_id, uid, tx);
464         let _ = self.send_event(TaskManagerEvent::Query(event));
465         ylong_runtime::block_on(rx).unwrap()
466     }
467 
query(&self, task_id: u32, action: Action) -> Option<TaskInfo>468     pub(crate) fn query(&self, task_id: u32, action: Action) -> Option<TaskInfo> {
469         let (tx, rx) = oneshot::channel();
470         let event = QueryEvent::Query(task_id, action, tx);
471         let _ = self.send_event(TaskManagerEvent::Query(event));
472         ylong_runtime::block_on(rx).unwrap()
473     }
474 
touch(&self, uid: u64, task_id: u32, token: String) -> Option<TaskInfo>475     pub(crate) fn touch(&self, uid: u64, task_id: u32, token: String) -> Option<TaskInfo> {
476         let (tx, rx) = oneshot::channel();
477         let event = QueryEvent::Touch(task_id, uid, token, tx);
478         let _ = self.send_event(TaskManagerEvent::Query(event));
479         ylong_runtime::block_on(rx).unwrap()
480     }
481 }
482 
483 pub(crate) struct TaskManagerRx {
484     rx: UnboundedReceiver<TaskManagerEvent>,
485 }
486 
487 impl TaskManagerRx {
new(rx: UnboundedReceiver<TaskManagerEvent>) -> Self488     pub(crate) fn new(rx: UnboundedReceiver<TaskManagerEvent>) -> Self {
489         Self { rx }
490     }
491 }
492 
493 impl Deref for TaskManagerRx {
494     type Target = UnboundedReceiver<TaskManagerEvent>;
495 
deref(&self) -> &Self::Target496     fn deref(&self) -> &Self::Target {
497         &self.rx
498     }
499 }
500 
501 impl DerefMut for TaskManagerRx {
deref_mut(&mut self) -> &mut Self::Target502     fn deref_mut(&mut self) -> &mut Self::Target {
503         &mut self.rx
504     }
505 }
506 
restore_all_tasks(tx: TaskManagerTx)507 async fn restore_all_tasks(tx: TaskManagerTx) {
508     sleep(Duration::from_secs(RESTORE_ALL_TASKS_INTERVAL)).await;
509     let _ = tx.send_event(TaskManagerEvent::Schedule(ScheduleEvent::RestoreAllTasks));
510 }
511 
clear_timeout_tasks(tx: TaskManagerTx)512 async fn clear_timeout_tasks(tx: TaskManagerTx) {
513     loop {
514         sleep(Duration::from_secs(CLEAR_INTERVAL)).await;
515         let _ = tx.send_event(TaskManagerEvent::Schedule(ScheduleEvent::ClearTimeoutTasks));
516     }
517 }
518