1 // Copyright (C) 2023 Huawei Device Co., Ltd.
2 // Licensed under the Apache License, Version 2.0 (the "License");
3 // you may not use this file except in compliance with the License.
4 // You may obtain a copy of the License at
5 //
6 // http://www.apache.org/licenses/LICENSE-2.0
7 //
8 // Unless required by applicable law or agreed to in writing, software
9 // distributed under the License is distributed on an "AS IS" BASIS,
10 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 // See the License for the specific language governing permissions and
12 // limitations under the License.
13
14 use std::collections::HashMap;
15 use std::ops::{Deref, DerefMut};
16 use std::time::Duration;
17
18 use samgr::definition::COMM_NET_CONN_MANAGER_SYS_ABILITY_ID;
19 use ylong_runtime::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
20 use ylong_runtime::sync::oneshot;
21 use ylong_runtime::time::sleep;
22
23 cfg_oh! {
24 use samgr::manage::SystemAbilityManager;
25 use crate::ability::PANIC_INFO;
26 use crate::manage::account::registry_account_subscribe;
27 }
28 use super::account::{remove_account_tasks, AccountEvent};
29 use super::database::RequestDb;
30 use super::events::{
31 QueryEvent, ScheduleEvent, ServiceEvent, StateEvent, TaskEvent, TaskManagerEvent,
32 };
33 use crate::config::Action;
34 use crate::error::ErrorCode;
35 use crate::info::TaskInfo;
36 use crate::manage::network::register_network_change;
37 use crate::manage::network_manager::NetworkManager;
38 use crate::manage::scheduler::state::Handler;
39 use crate::manage::scheduler::Scheduler;
40 use crate::service::client::ClientManagerEntry;
41 use crate::service::notification_bar::subscribe_notification_bar;
42 use crate::service::run_count::RunCountManagerEntry;
43 use crate::utils::runtime_spawn;
44
45 const CLEAR_INTERVAL: u64 = 30 * 60;
46 const LOG_INTERVAL: u64 = 5 * 60;
47 const RESTORE_ALL_TASKS_INTERVAL: u64 = 10;
48
49 // TaskManager 的初始化逻辑:
50 //
51 // 首先确定任务的来源:1)来自应用的任务 2)数据库中未完成的任务。
52 // 其次确定 SA 拉起的时机:1)WIFI 连接拉起 SA 2)应用拉起 SA
53
54 // Qos schedule 逻辑步骤:
55 // 1. SA 启动时,从数据库中将存在 Waiting + QosWaiting 的任务(Qos
56 // 信息)及应用信息取出,存放到 Qos 结构中排序,此时触发一次初始的任务加载。
57 // 2. 当新任务添加到 SA 侧\网络状态变化\前后台状态变化时,更新并排序
58 // Qos,触发任务加载,把可执行任务加载到内存中处理,
59 // 或是把不可执行任务返回数据库中。
60
61 pub(crate) struct TaskManager {
62 pub(crate) scheduler: Scheduler,
63 pub(crate) rx: TaskManagerRx,
64 pub(crate) client_manager: ClientManagerEntry,
65 // first usize for foreground , seconde for background
66 pub(crate) task_count: HashMap<u64, (usize, usize)>,
67 }
68
69 impl TaskManager {
init( runcount_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, #[cfg(not(feature = "oh"))] network: Network, ) -> TaskManagerTx70 pub(crate) fn init(
71 runcount_manager: RunCountManagerEntry,
72 client_manager: ClientManagerEntry,
73 #[cfg(not(feature = "oh"))] network: Network,
74 ) -> TaskManagerTx {
75 debug!("TaskManager init");
76
77 let (tx, rx) = unbounded_channel();
78 let tx = TaskManagerTx::new(tx);
79 let rx = TaskManagerRx::new(rx);
80
81 #[cfg(feature = "oh")]
82 registry_account_subscribe(tx.clone());
83
84 #[cfg(feature = "oh")]
85 {
86 let mut network_manager = NetworkManager::get_instance().lock().unwrap();
87 network_manager.tx = Some(tx.clone());
88 SystemAbilityManager::subscribe_system_ability(
89 COMM_NET_CONN_MANAGER_SYS_ABILITY_ID,
90 |_, _| {
91 register_network_change();
92 },
93 |_, _| {
94 info!("network service died");
95 },
96 );
97 }
98 #[cfg(feature = "oh")]
99 register_network_change();
100 subscribe_notification_bar(tx.clone());
101 let task_manager = Self::new(tx.clone(), rx, runcount_manager, client_manager);
102
103 // Performance optimization tips for task restoring:
104 //
105 // When SA is initializing, it will create and initialize an app sorting
106 // queue in `scheduler.QoS`, but there is no task rescheduling or
107 // execution at this time.
108 //
109 // After SA initialization, we will start a coroutine to recover all
110 // tasks, which is used to notify `TaskManager` to recover waiting tasks
111 // in the database.
112 //
113 // If a new task is started at this time, this future can
114 // be removed because the scheduler will also be rearranged in the
115 // startup logic of the new task.
116 runtime_spawn(restore_all_tasks(tx.clone()));
117
118 runtime_spawn(clear_timeout_tasks(tx.clone()));
119 runtime_spawn(log_all_task_info(tx.clone()));
120 runtime_spawn(task_manager.run());
121 tx
122 }
123
new( tx: TaskManagerTx, rx: TaskManagerRx, run_count_manager: RunCountManagerEntry, client_manager: ClientManagerEntry, ) -> Self124 pub(crate) fn new(
125 tx: TaskManagerTx,
126 rx: TaskManagerRx,
127 run_count_manager: RunCountManagerEntry,
128 client_manager: ClientManagerEntry,
129 ) -> Self {
130 Self {
131 scheduler: Scheduler::init(tx.clone(), run_count_manager, client_manager.clone()),
132 rx,
133 client_manager,
134 task_count: HashMap::new(),
135 }
136 }
137
run(mut self)138 async fn run(mut self) {
139 let db = RequestDb::get_instance();
140 db.clear_invalid_records();
141 loop {
142 let event = match self.rx.recv().await {
143 Ok(event) => event,
144 Err(e) => {
145 error!("TaskManager receives error {:?}", e);
146 continue;
147 }
148 };
149
150 match event {
151 TaskManagerEvent::Service(event) => self.handle_service_event(event),
152 TaskManagerEvent::State(event) => self.handle_state_event(event),
153 TaskManagerEvent::Task(event) => self.handle_task_event(event),
154 TaskManagerEvent::Schedule(event) => {
155 if self.handle_schedule_event(event) {
156 info!("TaskManager unload ok");
157 // If unload_sa success, can not breaks this loop.
158 }
159 }
160 TaskManagerEvent::Device(level) => {
161 self.scheduler.on_rss_change(level);
162 }
163 TaskManagerEvent::Account(event) => self.handle_account_event(event),
164 TaskManagerEvent::Query(query) => self.handle_query_event(query),
165 TaskManagerEvent::Reschedule => self.scheduler.reschedule(),
166 }
167
168 debug!("TaskManager handles events finished");
169 }
170 }
171
handle_account_event(&mut self, event: AccountEvent)172 pub(crate) fn handle_account_event(&mut self, event: AccountEvent) {
173 match event {
174 AccountEvent::Remove(user_id) => remove_account_tasks(user_id),
175 AccountEvent::Changed => self.scheduler.on_state_change(Handler::update_account, ()),
176 }
177 }
178
handle_service_event(&mut self, event: ServiceEvent)179 fn handle_service_event(&mut self, event: ServiceEvent) {
180 debug!("TaskManager handles service event {:?}", event);
181
182 match event {
183 ServiceEvent::Construct(msg, tx) => {
184 let _ = tx.send(self.create(msg.config));
185 }
186 ServiceEvent::Start(uid, task_id, tx) => {
187 let _ = tx.send(self.start(uid, task_id));
188 }
189 ServiceEvent::Stop(uid, task_id, tx) => {
190 let _ = tx.send(self.stop(uid, task_id));
191 }
192 ServiceEvent::Pause(uid, task_id, tx) => {
193 let _ = tx.send(self.pause(uid, task_id));
194 }
195 ServiceEvent::Resume(uid, task_id, tx) => {
196 let _ = tx.send(self.resume(uid, task_id));
197 }
198 ServiceEvent::Remove(uid, task_id, tx) => {
199 let _ = tx.send(self.remove(uid, task_id));
200 }
201 ServiceEvent::DumpAll(tx) => {
202 let _ = tx.send(self.query_all_task());
203 }
204 ServiceEvent::DumpOne(task_id, tx) => {
205 let _ = tx.send(self.query_one_task(task_id));
206 }
207 ServiceEvent::AttachGroup(uid, task_ids, group, tx) => {
208 let _ = tx.send(self.attach_group(uid, task_ids, group));
209 }
210 }
211 }
212
handle_state_event(&mut self, event: StateEvent)213 fn handle_state_event(&mut self, event: StateEvent) {
214 debug!("TaskManager handles state event {:?}", event);
215
216 match event {
217 StateEvent::Network => {
218 self.scheduler.retry_all_tasks();
219 self.scheduler.on_state_change(Handler::update_network, ());
220 }
221
222 StateEvent::ForegroundApp(uid) => {
223 self.scheduler.on_state_change(Handler::update_top_uid, uid);
224 }
225 StateEvent::Background(uid) => self
226 .scheduler
227 .on_state_change(Handler::update_background, uid),
228 StateEvent::BackgroundTimeout(uid) => self
229 .scheduler
230 .on_state_change(Handler::update_background_timeout, uid),
231 StateEvent::SpecialTerminate(uid) => {
232 self.scheduler
233 .on_state_change(Handler::special_process_terminate, uid);
234 }
235 }
236 }
237
handle_task_event(&mut self, event: TaskEvent)238 fn handle_task_event(&mut self, event: TaskEvent) {
239 debug!("TaskManager handles task event {:?}", event);
240
241 match event {
242 TaskEvent::Subscribe(task_id, token_id, tx) => {
243 let _ = tx.send(self.check_subscriber(task_id, token_id));
244 }
245 TaskEvent::Completed(task_id, uid, mode) => {
246 Scheduler::reduce_task_count(uid, mode, &mut self.task_count);
247 self.scheduler.task_completed(uid, task_id);
248 }
249 TaskEvent::Running(task_id, uid, mode) => {
250 self.scheduler
251 .task_cancel(uid, task_id, mode, &mut self.task_count);
252 }
253 TaskEvent::Failed(task_id, uid, reason, mode) => {
254 Scheduler::reduce_task_count(uid, mode, &mut self.task_count);
255 self.scheduler.task_failed(uid, task_id, reason);
256 }
257 TaskEvent::Offline(task_id, uid, mode) => {
258 self.scheduler
259 .task_cancel(uid, task_id, mode, &mut self.task_count);
260 }
261 };
262 }
263
handle_schedule_event(&mut self, message: ScheduleEvent) -> bool264 fn handle_schedule_event(&mut self, message: ScheduleEvent) -> bool {
265 debug!("TaskManager handle scheduled_message {:?}", message);
266
267 match message {
268 ScheduleEvent::ClearTimeoutTasks => self.clear_timeout_tasks(),
269 ScheduleEvent::LogTasks => self.log_all_task_info(),
270 ScheduleEvent::RestoreAllTasks => self.restore_all_tasks(),
271 ScheduleEvent::Unload => return self.unload_sa(),
272 }
273 false
274 }
275
check_subscriber(&self, task_id: u32, token_id: u64) -> ErrorCode276 fn check_subscriber(&self, task_id: u32, token_id: u64) -> ErrorCode {
277 match RequestDb::get_instance().query_task_token_id(task_id) {
278 Ok(id) if id == token_id => ErrorCode::ErrOk,
279 Ok(_) => ErrorCode::Permission,
280 Err(_) => ErrorCode::TaskNotFound,
281 }
282 }
283
clear_timeout_tasks(&mut self)284 fn clear_timeout_tasks(&mut self) {
285 self.scheduler.clear_timeout_tasks();
286 }
287
log_all_task_info(&self)288 fn log_all_task_info(&self) {
289 self.scheduler.dump_tasks();
290 }
291
restore_all_tasks(&mut self)292 fn restore_all_tasks(&mut self) {
293 self.scheduler.restore_all_tasks();
294 }
295
unload_sa(&mut self) -> bool296 fn unload_sa(&mut self) -> bool {
297 const REQUEST_SERVICE_ID: i32 = 3706;
298
299 if !self.rx.is_empty() {
300 return false;
301 }
302
303 let running_tasks = self.scheduler.running_tasks();
304 if running_tasks != 0 {
305 info!("running {} tasks when unload SA", running_tasks,);
306 return false;
307 }
308
309 // check rx again for there may be new message arrive.
310 if !self.rx.is_empty() {
311 return false;
312 }
313
314 info!("unload SA");
315
316 // failed logic?
317 #[cfg(feature = "oh")]
318 let _ = SystemAbilityManager::unload_system_ability(REQUEST_SERVICE_ID);
319
320 true
321 }
322 }
323
324 #[allow(unreachable_pub)]
325 #[derive(Clone)]
326 pub struct TaskManagerTx {
327 pub(crate) tx: UnboundedSender<TaskManagerEvent>,
328 }
329
330 impl TaskManagerTx {
new(tx: UnboundedSender<TaskManagerEvent>) -> Self331 pub(crate) fn new(tx: UnboundedSender<TaskManagerEvent>) -> Self {
332 Self { tx }
333 }
334
send_event(&self, event: TaskManagerEvent) -> bool335 pub(crate) fn send_event(&self, event: TaskManagerEvent) -> bool {
336 if self.tx.send(event).is_err() {
337 #[cfg(feature = "oh")]
338 unsafe {
339 if let Some(e) = PANIC_INFO.as_ref() {
340 error!("Sends TaskManager event failed {}", e);
341 } else {
342 info!("TaskManager is unloading");
343 }
344 }
345 return false;
346 }
347 true
348 }
349
notify_foreground_app_change(&self, uid: u64)350 pub(crate) fn notify_foreground_app_change(&self, uid: u64) {
351 let _ = self.send_event(TaskManagerEvent::State(StateEvent::ForegroundApp(uid)));
352 }
353
notify_app_background(&self, uid: u64)354 pub(crate) fn notify_app_background(&self, uid: u64) {
355 let _ = self.send_event(TaskManagerEvent::State(StateEvent::Background(uid)));
356 }
357
trigger_background_timeout(&self, uid: u64)358 pub(crate) fn trigger_background_timeout(&self, uid: u64) {
359 let _ = self.send_event(TaskManagerEvent::State(StateEvent::BackgroundTimeout(uid)));
360 }
361
notify_special_process_terminate(&self, uid: u64)362 pub(crate) fn notify_special_process_terminate(&self, uid: u64) {
363 let _ = self.send_event(TaskManagerEvent::State(StateEvent::SpecialTerminate(uid)));
364 }
365
show(&self, uid: u64, task_id: u32) -> Option<TaskInfo>366 pub(crate) fn show(&self, uid: u64, task_id: u32) -> Option<TaskInfo> {
367 let (tx, rx) = oneshot::channel();
368 let event = QueryEvent::Show(task_id, uid, tx);
369 let _ = self.send_event(TaskManagerEvent::Query(event));
370 ylong_runtime::block_on(rx).unwrap()
371 }
372
query(&self, task_id: u32, action: Action) -> Option<TaskInfo>373 pub(crate) fn query(&self, task_id: u32, action: Action) -> Option<TaskInfo> {
374 let (tx, rx) = oneshot::channel();
375 let event = QueryEvent::Query(task_id, action, tx);
376 let _ = self.send_event(TaskManagerEvent::Query(event));
377 ylong_runtime::block_on(rx).unwrap()
378 }
379
touch(&self, uid: u64, task_id: u32, token: String) -> Option<TaskInfo>380 pub(crate) fn touch(&self, uid: u64, task_id: u32, token: String) -> Option<TaskInfo> {
381 let (tx, rx) = oneshot::channel();
382 let event = QueryEvent::Touch(task_id, uid, token, tx);
383 let _ = self.send_event(TaskManagerEvent::Query(event));
384 ylong_runtime::block_on(rx).unwrap()
385 }
386 }
387
388 pub(crate) struct TaskManagerRx {
389 rx: UnboundedReceiver<TaskManagerEvent>,
390 }
391
392 impl TaskManagerRx {
new(rx: UnboundedReceiver<TaskManagerEvent>) -> Self393 pub(crate) fn new(rx: UnboundedReceiver<TaskManagerEvent>) -> Self {
394 Self { rx }
395 }
396 }
397
398 impl Deref for TaskManagerRx {
399 type Target = UnboundedReceiver<TaskManagerEvent>;
400
deref(&self) -> &Self::Target401 fn deref(&self) -> &Self::Target {
402 &self.rx
403 }
404 }
405
406 impl DerefMut for TaskManagerRx {
deref_mut(&mut self) -> &mut Self::Target407 fn deref_mut(&mut self) -> &mut Self::Target {
408 &mut self.rx
409 }
410 }
411
restore_all_tasks(tx: TaskManagerTx)412 async fn restore_all_tasks(tx: TaskManagerTx) {
413 sleep(Duration::from_secs(RESTORE_ALL_TASKS_INTERVAL)).await;
414 let _ = tx.send_event(TaskManagerEvent::Schedule(ScheduleEvent::RestoreAllTasks));
415 }
416
clear_timeout_tasks(tx: TaskManagerTx)417 async fn clear_timeout_tasks(tx: TaskManagerTx) {
418 loop {
419 sleep(Duration::from_secs(CLEAR_INTERVAL)).await;
420 let _ = tx.send_event(TaskManagerEvent::Schedule(ScheduleEvent::ClearTimeoutTasks));
421 }
422 }
423
log_all_task_info(tx: TaskManagerTx)424 async fn log_all_task_info(tx: TaskManagerTx) {
425 loop {
426 sleep(Duration::from_secs(LOG_INTERVAL)).await;
427 let _ = tx.send_event(TaskManagerEvent::Schedule(ScheduleEvent::LogTasks));
428 }
429 }
430