1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::collections::HashMap;
15 use std::ops::{Deref, DerefMut};
16 use std::time::Duration;
17
18 use samgr::definition::COMM_NET_CONN_MANAGER_SYS_ABILITY_ID;
19 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
20 use ylong_runtime::sync::oneshot;
21 use ylong_runtime::time::sleep;
22
23 cfg_oh! {
24 use samgr::manage::SystemAbilityManager;
25 use crate::ability::PANIC_INFO;
26 use crate::manage::account::registry_account_subscribe;
27 }
28 use super::account::{remove_account_tasks, AccountEvent};
29 use super::database::RequestDb;
30 use super::events::{
31 QueryEvent, ScheduleEvent, ServiceEvent, StateEvent, TaskEvent, TaskManagerEvent,
32 };
33 use crate::config::{Action, Mode};
34 use crate::database::clear_database_part;
35 use crate::error::ErrorCode;
36 use crate::info::{State, TaskInfo};
37 use crate::manage::app_state::AppUninstallSubscriber;
38 use crate::manage::network::register_network_change;
39 use crate::manage::network_manager::NetworkManager;
40 use crate::manage::query::TaskFilter;
41 use crate::manage::scheduler::state::Handler;
42 use crate::manage::scheduler::Scheduler;
43 use crate::service::active_counter::ActiveCounter;
44 use crate::service::client::ClientManagerEntry;
45 use crate::service::notification_bar::{subscribe_notification_bar, NotificationDispatcher};
46 use crate::service::run_count::RunCountManagerEntry;
47 use crate::utils::task_event_count::{task_complete_add, task_fail_add, task_unload};
48 use crate::utils::{get_current_timestamp, runtime_spawn, subscribe_common_event, update_policy};
49
50 const CLEAR_INTERVAL: u64 = 30 * 60;
51 const RESTORE_ALL_TASKS_INTERVAL: u64 = 10;
52
53 // TaskManager 的初始化逻辑:
54 //
55 // 首先确定任务的来源:1)来自应用的任务 2)数据库中未完成的任务。
56 // 其次确定 SA 拉起的时机:1)WIFI 连接拉起 SA 2)应用拉起 SA
57
58 // Qos schedule 逻辑步骤:
59 // 1. SA 启动时,从数据库中将存在 Waiting + QosWaiting 的任务(Qos
60 // 信息)及应用信息取出,存放到 Qos 结构中排序,此时触发一次初始的任务加载。
61 // 2. 当新任务添加到 SA 侧\网络状态变化\前后台状态变化时,更新并排序
62 // Qos,触发任务加载,把可执行任务加载到内存中处理,
63 // 或是把不可执行任务返回数据库中。
64
65 pub(crate) struct TaskManager {
66 pub(crate) scheduler: Scheduler,
67 pub(crate) rx: TaskManagerRx,
68 pub(crate) client_manager: ClientManagerEntry,
69 // first usize for foreground , seconde for background
70 pub(crate) task_count: HashMap<u64, (usize, usize)>,
71 }
72
73 impl TaskManager {
init( runcount_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, active_counter: ActiveCounter, #[cfg(not(feature = "oh"))] network: Network, ) -> TaskManagerTx74 pub(crate) fn init(
75 runcount_manager: RunCountManagerEntry,
76 client_manager: ClientManagerEntry,
77 active_counter: ActiveCounter,
78 #[cfg(not(feature = "oh"))] network: Network,
79 ) -> TaskManagerTx {
80 debug!("TaskManager init");
81
82 let (tx, rx) = unbounded_channel();
83 let tx = TaskManagerTx::new(tx);
84 let rx = TaskManagerRx::new(rx);
85
86 #[cfg(feature = "oh")]
87 registry_account_subscribe(tx.clone());
88
89 #[cfg(feature = "oh")]
90 {
91 let mut network_manager = NetworkManager::get_instance().lock().unwrap();
92 network_manager.tx = Some(tx.clone());
93 SystemAbilityManager::subscribe_system_ability(
94 COMM_NET_CONN_MANAGER_SYS_ABILITY_ID,
95 |_, _| {
96 register_network_change();
97 },
98 |_, _| {
99 info!("network service died");
100 },
101 );
102 }
103 #[cfg(feature = "oh")]
104 register_network_change();
105 subscribe_notification_bar(tx.clone());
106
107 if let Err(e) = subscribe_common_event(
108 vec![
109 "usual.event.PACKAGE_REMOVED",
110 "usual.event.BUNDLE_REMOVED",
111 "usual.event.PACKAGE_FULLY_REMOVED",
112 ],
113 AppUninstallSubscriber::new(tx.clone()),
114 ) {
115 error!("Subscribe app uninstall event failed: {}", e);
116 sys_event!(
117 ExecFault,
118 DfxCode::EVENT_FAULT_01,
119 &format!("Subscribe app uninstall event failed: {}", e)
120 );
121 }
122
123 let task_manager = Self::new(
124 tx.clone(),
125 rx,
126 runcount_manager,
127 client_manager,
128 active_counter,
129 );
130
131 // Performance optimization tips for task restoring:
132 //
133 // When SA is initializing, it will create and initialize an app sorting
134 // queue in `scheduler.QoS`, but there is no task rescheduling or
135 // execution at this time.
136 //
137 // After SA initialization, we will start a coroutine to recover all
138 // tasks, which is used to notify `TaskManager` to recover waiting tasks
139 // in the database.
140 //
141 // If a new task is started at this time, this future can
142 // be removed because the scheduler will also be rearranged in the
143 // startup logic of the new task.
144 runtime_spawn(restore_all_tasks(tx.clone()));
145
146 runtime_spawn(clear_timeout_tasks(tx.clone()));
147 runtime_spawn(task_manager.run());
148 tx
149 }
150
new( tx: TaskManagerTx, rx: TaskManagerRx, run_count_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, active_counter: ActiveCounter, ) -> Self151 pub(crate) fn new(
152 tx: TaskManagerTx,
153 rx: TaskManagerRx,
154 run_count_manager: RunCountManagerEntry,
155 client_manager: ClientManagerEntry,
156 active_counter: ActiveCounter,
157 ) -> Self {
158 Self {
159 scheduler: Scheduler::init(
160 tx.clone(),
161 run_count_manager,
162 client_manager.clone(),
163 active_counter,
164 ),
165 rx,
166 client_manager,
167 task_count: HashMap::new(),
168 }
169 }
170
run(mut self)171 async fn run(mut self) {
172 let db = RequestDb::get_instance();
173 db.clear_invalid_records();
174 loop {
175 let event = match self.rx.recv().await {
176 Ok(event) => event,
177 Err(e) => {
178 error!("TaskManager receives error {:?}", e);
179 continue;
180 }
181 };
182
183 match event {
184 TaskManagerEvent::Service(event) => self.handle_service_event(event),
185 TaskManagerEvent::State(event) => self.handle_state_event(event),
186 TaskManagerEvent::Task(event) => self.handle_task_event(event),
187 TaskManagerEvent::Schedule(event) => {
188 if self.handle_schedule_event(event) {
189 info!("TaskManager unload ok");
190 // If unload_sa success, can not breaks this loop.
191 }
192 }
193 TaskManagerEvent::Device(level) => {
194 self.scheduler.on_rss_change(level);
195 }
196 TaskManagerEvent::Account(event) => self.handle_account_event(event),
197 TaskManagerEvent::Query(query) => self.handle_query_event(query),
198 TaskManagerEvent::Reschedule => self.scheduler.reschedule(),
199 }
200
201 debug!("TaskManager handles events finished");
202 }
203 }
204
handle_account_event(&mut self, event: AccountEvent)205 pub(crate) fn handle_account_event(&mut self, event: AccountEvent) {
206 match event {
207 AccountEvent::Remove(user_id) => remove_account_tasks(user_id),
208 AccountEvent::Changed => self.scheduler.on_state_change(Handler::update_account, ()),
209 }
210 }
211
handle_service_event(&mut self, event: ServiceEvent)212 fn handle_service_event(&mut self, event: ServiceEvent) {
213 debug!("TaskManager handles service event {:?}", event);
214
215 match event {
216 ServiceEvent::Construct(msg, tx) => {
217 let _ = tx.send(self.create(msg.config));
218 }
219 ServiceEvent::Start(uid, task_id, tx) => {
220 let _ = tx.send(self.start(uid, task_id));
221 }
222 ServiceEvent::Stop(uid, task_id, tx) => {
223 let _ = tx.send(self.stop(uid, task_id));
224 }
225 ServiceEvent::Pause(uid, task_id, tx) => {
226 let _ = tx.send(self.pause(uid, task_id));
227 }
228 ServiceEvent::Resume(uid, task_id, tx) => {
229 let _ = tx.send(self.resume(uid, task_id));
230 }
231 ServiceEvent::Remove(uid, task_id, tx) => {
232 let _ = tx.send(self.remove(uid, task_id));
233 }
234 ServiceEvent::SetMaxSpeed(uid, task_id, max_speed, tx) => {
235 let _ = tx.send(self.set_max_speed(uid, task_id, max_speed));
236 }
237 ServiceEvent::DumpAll(tx) => {
238 let _ = tx.send(self.query_all_task());
239 }
240 ServiceEvent::DumpOne(task_id, tx) => {
241 let _ = tx.send(self.query_one_task(task_id));
242 }
243 ServiceEvent::AttachGroup(uid, task_ids, group, tx) => {
244 let _ = tx.send(self.attach_group(uid, task_ids, group));
245 }
246 ServiceEvent::SetMode(uid, task_id, mode, tx) => {
247 let _ = tx.send(self.set_mode(uid, task_id, mode));
248 }
249 }
250 }
251
handle_state_event(&mut self, event: StateEvent)252 fn handle_state_event(&mut self, event: StateEvent) {
253 debug!("TaskManager handles state event {:?}", event);
254
255 match event {
256 StateEvent::Network => {
257 self.scheduler.retry_all_tasks();
258 self.scheduler.on_state_change(Handler::update_network, ());
259 }
260
261 StateEvent::ForegroundApp(uid) => {
262 self.scheduler.on_state_change(Handler::update_top_uid, uid);
263 }
264 StateEvent::Background(uid) => self
265 .scheduler
266 .on_state_change(Handler::update_background, uid),
267 StateEvent::BackgroundTimeout(uid) => self
268 .scheduler
269 .on_state_change(Handler::update_background_timeout, uid),
270 StateEvent::AppUninstall(uid) => {
271 self.scheduler.on_state_change(Handler::app_uninstall, uid);
272 }
273 StateEvent::SpecialTerminate(uid) => {
274 self.scheduler
275 .on_state_change(Handler::special_process_terminate, uid);
276 }
277 }
278 }
279
handle_task_event(&mut self, event: TaskEvent)280 fn handle_task_event(&mut self, event: TaskEvent) {
281 debug!("TaskManager handles task event {:?}", event);
282
283 match event {
284 TaskEvent::Subscribe(task_id, token_id, tx) => {
285 let _ = tx.send(self.check_subscriber(task_id, token_id));
286 }
287 TaskEvent::Completed(task_id, uid, mode) => {
288 Scheduler::reduce_task_count(uid, mode, &mut self.task_count);
289 task_complete_add();
290 self.scheduler.task_completed(uid, task_id);
291 }
292 TaskEvent::Running(task_id, uid, mode) => {
293 self.scheduler
294 .task_cancel(uid, task_id, mode, &mut self.task_count);
295 }
296 TaskEvent::Failed(task_id, uid, reason, mode) => {
297 Scheduler::reduce_task_count(uid, mode, &mut self.task_count);
298 task_fail_add();
299 self.scheduler.task_failed(uid, task_id, reason);
300 }
301 TaskEvent::Offline(task_id, uid, mode) => {
302 self.scheduler
303 .task_cancel(uid, task_id, mode, &mut self.task_count);
304 }
305 };
306 }
307
handle_schedule_event(&mut self, message: ScheduleEvent) -> bool308 fn handle_schedule_event(&mut self, message: ScheduleEvent) -> bool {
309 debug!("TaskManager handle scheduled_message {:?}", message);
310
311 match message {
312 ScheduleEvent::ClearTimeoutTasks => self.clear_timeout_tasks(),
313 ScheduleEvent::RestoreAllTasks => self.restore_all_tasks(),
314 ScheduleEvent::Unload => return self.unload_sa(),
315 }
316 false
317 }
318
check_subscriber(&self, task_id: u32, token_id: u64) -> ErrorCode319 fn check_subscriber(&self, task_id: u32, token_id: u64) -> ErrorCode {
320 match RequestDb::get_instance().query_task_token_id(task_id) {
321 Ok(id) if id == token_id => ErrorCode::ErrOk,
322 Ok(_) => ErrorCode::Permission,
323 Err(_) => ErrorCode::TaskNotFound,
324 }
325 }
326
clear_timeout_tasks(&mut self)327 fn clear_timeout_tasks(&mut self) {
328 self.scheduler.clear_timeout_tasks();
329 }
330
restore_all_tasks(&mut self)331 fn restore_all_tasks(&mut self) {
332 self.scheduler.restore_all_tasks();
333 }
334
check_any_tasks(&self) -> bool335 fn check_any_tasks(&self) -> bool {
336 let running_tasks = self.scheduler.running_tasks();
337 if running_tasks != 0 {
338 info!("running {} tasks when unload SA", running_tasks,);
339 return true;
340 }
341
342 // check rx again for there may be new message arrive.
343 if !self.rx.is_empty() {
344 return true;
345 }
346 false
347 }
348
unload_sa(&mut self) -> bool349 fn unload_sa(&mut self) -> bool {
350 if self.check_any_tasks() {
351 return false;
352 }
353
354 const TIMES: usize = 10;
355 const PRE_COUNT: usize = 1000;
356
357 for _i in 0..TIMES {
358 let remain = clear_database_part(PRE_COUNT).unwrap_or(false);
359 if self.check_any_tasks() {
360 return false;
361 }
362 if !remain {
363 break;
364 }
365 }
366 NotificationDispatcher::get_instance().clear_group_info();
367
368 const REQUEST_SERVICE_ID: i32 = 3706;
369 const ONE_MONTH: i64 = 30 * 24 * 60 * 60 * 1000;
370
371 let db = RequestDb::get_instance();
372
373 let filter = TaskFilter {
374 before: get_current_timestamp() as i64,
375 after: get_current_timestamp() as i64 - ONE_MONTH,
376 state: State::Waiting.repr,
377 action: Action::Any.repr,
378 mode: Mode::Any.repr,
379 };
380
381 let bundle_name = "*".to_string();
382
383 let task_ids = db.system_search_task(filter, bundle_name);
384
385 info!("unload SA");
386 task_unload();
387
388 let any_tasks = task_ids.is_empty();
389 let update_on_demand_policy = update_policy(any_tasks);
390 if update_on_demand_policy != 0 {
391 info!("Update on demand policy failed");
392 }
393
394 // failed logic?
395 #[cfg(feature = "oh")]
396 let _ = SystemAbilityManager::unload_system_ability(REQUEST_SERVICE_ID);
397
398 true
399 }
400 }
401
402 #[cxx::bridge(namespace = "OHOS::Request")]
403 mod ffi {
404 #[derive(Clone, Debug, Copy)]
405 pub(crate) struct TaskQosInfo {
406 pub(crate) task_id: u32,
407 pub(crate) action: u8,
408 pub(crate) mode: u8,
409 pub(crate) state: u8,
410 pub(crate) priority: u32,
411 }
412
413 unsafe extern "C++" {
414 include!("system_ability_manager.h");
415 include!("system_ability_on_demand_event.h");
416 }
417 }
418
419 #[allow(unreachable_pub)]
420 #[derive(Clone)]
421 pub struct TaskManagerTx {
422 pub(crate) tx: UnboundedSender<TaskManagerEvent>,
423 }
424
425 impl TaskManagerTx {
new(tx: UnboundedSender<TaskManagerEvent>) -> Self426 pub(crate) fn new(tx: UnboundedSender<TaskManagerEvent>) -> Self {
427 Self { tx }
428 }
429
send_event(&self, event: TaskManagerEvent) -> bool430 pub(crate) fn send_event(&self, event: TaskManagerEvent) -> bool {
431 if self.tx.send(event).is_err() {
432 #[cfg(feature = "oh")]
433 unsafe {
434 if let Some(e) = PANIC_INFO.as_ref() {
435 error!("Sends TaskManager event failed {}", e);
436 } else {
437 info!("TaskManager is unloading");
438 }
439 }
440 return false;
441 }
442 true
443 }
444
notify_foreground_app_change(&self, uid: u64)445 pub(crate) fn notify_foreground_app_change(&self, uid: u64) {
446 let _ = self.send_event(TaskManagerEvent::State(StateEvent::ForegroundApp(uid)));
447 }
448
notify_app_background(&self, uid: u64)449 pub(crate) fn notify_app_background(&self, uid: u64) {
450 let _ = self.send_event(TaskManagerEvent::State(StateEvent::Background(uid)));
451 }
452
trigger_background_timeout(&self, uid: u64)453 pub(crate) fn trigger_background_timeout(&self, uid: u64) {
454 let _ = self.send_event(TaskManagerEvent::State(StateEvent::BackgroundTimeout(uid)));
455 }
456
notify_special_process_terminate(&self, uid: u64)457 pub(crate) fn notify_special_process_terminate(&self, uid: u64) {
458 let _ = self.send_event(TaskManagerEvent::State(StateEvent::SpecialTerminate(uid)));
459 }
460
show(&self, uid: u64, task_id: u32) -> Option<TaskInfo>461 pub(crate) fn show(&self, uid: u64, task_id: u32) -> Option<TaskInfo> {
462 let (tx, rx) = oneshot::channel();
463 let event = QueryEvent::Show(task_id, uid, tx);
464 let _ = self.send_event(TaskManagerEvent::Query(event));
465 ylong_runtime::block_on(rx).unwrap()
466 }
467
query(&self, task_id: u32, action: Action) -> Option<TaskInfo>468 pub(crate) fn query(&self, task_id: u32, action: Action) -> Option<TaskInfo> {
469 let (tx, rx) = oneshot::channel();
470 let event = QueryEvent::Query(task_id, action, tx);
471 let _ = self.send_event(TaskManagerEvent::Query(event));
472 ylong_runtime::block_on(rx).unwrap()
473 }
474
touch(&self, uid: u64, task_id: u32, token: String) -> Option<TaskInfo>475 pub(crate) fn touch(&self, uid: u64, task_id: u32, token: String) -> Option<TaskInfo> {
476 let (tx, rx) = oneshot::channel();
477 let event = QueryEvent::Touch(task_id, uid, token, tx);
478 let _ = self.send_event(TaskManagerEvent::Query(event));
479 ylong_runtime::block_on(rx).unwrap()
480 }
481 }
482
483 pub(crate) struct TaskManagerRx {
484 rx: UnboundedReceiver<TaskManagerEvent>,
485 }
486
487 impl TaskManagerRx {
new(rx: UnboundedReceiver<TaskManagerEvent>) -> Self488 pub(crate) fn new(rx: UnboundedReceiver<TaskManagerEvent>) -> Self {
489 Self { rx }
490 }
491 }
492
493 impl Deref for TaskManagerRx {
494 type Target = UnboundedReceiver<TaskManagerEvent>;
495
deref(&self) -> &Self::Target496 fn deref(&self) -> &Self::Target {
497 &self.rx
498 }
499 }
500
501 impl DerefMut for TaskManagerRx {
deref_mut(&mut self) -> &mut Self::Target502 fn deref_mut(&mut self) -> &mut Self::Target {
503 &mut self.rx
504 }
505 }
506
restore_all_tasks(tx: TaskManagerTx)507 async fn restore_all_tasks(tx: TaskManagerTx) {
508 sleep(Duration::from_secs(RESTORE_ALL_TASKS_INTERVAL)).await;
509 let _ = tx.send_event(TaskManagerEvent::Schedule(ScheduleEvent::RestoreAllTasks));
510 }
511
clear_timeout_tasks(tx: TaskManagerTx)512 async fn clear_timeout_tasks(tx: TaskManagerTx) {
513 loop {
514 sleep(Duration::from_secs(CLEAR_INTERVAL)).await;
515 let _ = tx.send_event(TaskManagerEvent::Schedule(ScheduleEvent::ClearTimeoutTasks));
516 }
517 }
518