• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <hisysevent.h>
16 #include <ipc_skeleton.h>
17 
18 #include "work_queue_manager.h"
19 #include "work_scheduler_service.h"
20 #include "work_sched_hilog.h"
21 #include "work_sched_utils.h"
22 
23 using namespace std;
24 
25 namespace OHOS {
26 namespace WorkScheduler {
27 const uint32_t TIME_CYCLE = 10 * 60 * 1000; // 10min
28 static int32_t g_timeRetrigger = INT32_MAX;
29 
WorkQueueManager(const std::shared_ptr<WorkSchedulerService> & wss)30 WorkQueueManager::WorkQueueManager(const std::shared_ptr<WorkSchedulerService>& wss) : wss_(wss)
31 {
32     timeCycle_ = TIME_CYCLE;
33 }
34 
Init()35 bool WorkQueueManager::Init()
36 {
37     return true;
38 }
39 
AddListener(WorkCondition::Type type,shared_ptr<IConditionListener> listener)40 bool WorkQueueManager::AddListener(WorkCondition::Type type, shared_ptr<IConditionListener> listener)
41 {
42     std::lock_guard<ffrt::mutex> lock(mutex_);
43     if (listenerMap_.count(type) > 0) {
44         return false;
45     }
46     listenerMap_.emplace(type, listener);
47     return true;
48 }
49 
AddWork(shared_ptr<WorkStatus> workStatus)50 bool WorkQueueManager::AddWork(shared_ptr<WorkStatus> workStatus)
51 {
52     if (!workStatus || !workStatus->workInfo_ || !workStatus->workInfo_->GetConditionMap()) {
53         return false;
54     }
55     WS_HILOGD("workStatus ID: %{public}s", workStatus->workId_.c_str());
56     std::lock_guard<ffrt::mutex> lock(mutex_);
57     auto map = workStatus->workInfo_->GetConditionMap();
58     for (auto it : *map) {
59         if (queueMap_.count(it.first) == 0) {
60             queueMap_.emplace(it.first, make_shared<WorkQueue>());
61             if (listenerMap_.count(it.first) != 0) {
62                 listenerMap_.at(it.first)->Start();
63             }
64         }
65         queueMap_.at(it.first)->Push(workStatus);
66     }
67     if (WorkSchedUtils::IsSystemApp()) {
68         WS_HILOGI("Is system app, default group is active.");
69         workStatus->workInfo_->SetCallBySystemApp(true);
70     }
71     return true;
72 }
73 
RemoveWork(shared_ptr<WorkStatus> workStatus)74 bool WorkQueueManager::RemoveWork(shared_ptr<WorkStatus> workStatus)
75 {
76     std::lock_guard<ffrt::mutex> lock(mutex_);
77     WS_HILOGD("workStatus ID: %{public}s", workStatus->workId_.c_str());
78     auto map = workStatus->workInfo_->GetConditionMap();
79     for (auto it : *map) {
80         if (queueMap_.count(it.first) > 0) {
81             queueMap_.at(it.first)->Remove(workStatus);
82         }
83         if (queueMap_.count(it.first) == 0) {
84             listenerMap_.at(it.first)->Stop();
85         }
86     }
87     return true;
88 }
89 
CancelWork(shared_ptr<WorkStatus> workStatus)90 bool WorkQueueManager::CancelWork(shared_ptr<WorkStatus> workStatus)
91 {
92     std::lock_guard<ffrt::mutex> lock(mutex_);
93     WS_HILOGD("workStatus ID: %{public}s", workStatus->workId_.c_str());
94     for (auto it : queueMap_) {
95         it.second->CancelWork(workStatus);
96         if (queueMap_.count(it.first) == 0) {
97             listenerMap_.at(it.first)->Stop();
98         }
99     }
100     // Notify work remove event to battery statistics
101     int32_t pid = IPCSkeleton::GetCallingPid();
102     HiSysEventWrite(HiviewDFX::HiSysEvent::Domain::WORK_SCHEDULER,
103         "WORK_REMOVE", HiviewDFX::HiSysEvent::EventType::STATISTIC, "UID", workStatus->uid_,
104         "PID", pid, "NAME", workStatus->bundleName_, "WORKID", workStatus->workId_);
105     return true;
106 }
107 
GetReayQueue(WorkCondition::Type conditionType,shared_ptr<DetectorValue> conditionVal)108 vector<shared_ptr<WorkStatus>> WorkQueueManager::GetReayQueue(WorkCondition::Type conditionType,
109     shared_ptr<DetectorValue> conditionVal)
110 {
111     vector<shared_ptr<WorkStatus>> result;
112     std::lock_guard<ffrt::mutex> lock(mutex_);
113     if (conditionType != WorkCondition::Type::GROUP && queueMap_.count(conditionType) > 0) {
114         shared_ptr<WorkQueue> workQueue = queueMap_.at(conditionType);
115         result = workQueue->OnConditionChanged(conditionType, conditionVal);
116     }
117     if (conditionType == WorkCondition::Type::GROUP || conditionType == WorkCondition::Type::STANDBY) {
118         for (auto it : queueMap_) {
119             shared_ptr<WorkQueue> workQueue = it.second;
120             auto works = workQueue->OnConditionChanged(conditionType, conditionVal);
121             PushWork(works, result);
122         }
123     }
124     bool hasStop = false;
125     auto it = result.begin();
126     while (it != result.end()) {
127         if (!(*it)->needRetrigger_) {
128             ++it;
129             continue;
130         }
131         if (conditionType != WorkCondition::Type::TIMER
132                 && conditionType != WorkCondition::Type::GROUP) {
133             WS_HILOGI("Need retrigger, start group listener, bundleName:%{public}s, workId:%{public}s",
134                 (*it)->bundleName_.c_str(), (*it)->workId_.c_str());
135             SetTimeRetrigger((*it)->timeRetrigger_);
136             if (!hasStop) {
137                 listenerMap_.at(WorkCondition::Type::GROUP)->Stop();
138                 hasStop = true;
139             }
140             listenerMap_.at(WorkCondition::Type::GROUP)->Start();
141         }
142         (*it)->needRetrigger_ = false;
143         (*it)->timeRetrigger_ = INT32_MAX;
144         it = result.erase(it);
145     }
146     PrintWorkStatus(conditionType);
147     ClearTimeOutWorkStatus();
148     return result;
149 }
150 
ClearTimeOutWorkStatus()151 void WorkQueueManager::ClearTimeOutWorkStatus()
152 {
153     std::set<std::string> allWorkIds;
154     for (auto it : queueMap_) {
155         shared_ptr<WorkQueue> workQueue = it.second;
156         auto workList = workQueue->GetWorkList();
157         for (auto work : workList) {
158             if (!work->IsTimeout()) {
159                 continue;
160             }
161             if (allWorkIds.count(work->workId_) != 0) {
162                 continue;
163             }
164             allWorkIds.insert(work->workId_);
165             WS_HILOGE("work timed out and will be ended, bundleName:%{public}s, workId:%{public}s",
166                 work->bundleName_.c_str(), work->workId_.c_str());
167             AsyncStopWork(work);
168         }
169     }
170 }
171 
PrintWorkStatus(WorkCondition::Type conditionType)172 void WorkQueueManager::PrintWorkStatus(WorkCondition::Type conditionType)
173 {
174     if (conditionType == WorkCondition::Type::GROUP || conditionType == WorkCondition::Type::STANDBY) {
175         PrintAllWorkStatus(conditionType);
176         return;
177     }
178     if (queueMap_.count(conditionType) > 0) {
179         shared_ptr<WorkQueue> workQueue = queueMap_.at(conditionType);
180         auto workList = workQueue->GetWorkList();
181         for (auto work : workList) {
182             work->ToString(conditionType);
183         }
184     }
185 }
186 
PrintAllWorkStatus(WorkCondition::Type conditionType)187 void WorkQueueManager::PrintAllWorkStatus(WorkCondition::Type conditionType)
188 {
189     std::set<std::string> allWorkIds;
190     for (auto it : queueMap_) {
191         shared_ptr<WorkQueue> workQueue = it.second;
192         auto workList = workQueue->GetWorkList();
193         for (auto work : workList) {
194             if (allWorkIds.count(work->workId_) != 0) {
195                 continue;
196             }
197             allWorkIds.insert(work->workId_);
198             work->ToString(conditionType);
199         }
200     }
201 }
202 
PushWork(vector<shared_ptr<WorkStatus>> & works,vector<shared_ptr<WorkStatus>> & result)203 void WorkQueueManager::PushWork(vector<shared_ptr<WorkStatus>> &works, vector<shared_ptr<WorkStatus>> &result)
204 {
205     for (const auto &work : works) {
206         auto iter = std::find_if(result.begin(), result.end(),
207         [&](const auto &existingWork) {
208             return existingWork->workId_ == work->workId_;
209         });
210         if (iter != result.end()) {
211             WS_HILOGE("WorkId:%{public}s existing, bundleName:%{public}s",
212                 work->workId_.c_str(), work->bundleName_.c_str());
213             continue;
214         }
215         result.push_back(work);
216     }
217 }
218 
OnConditionChanged(WorkCondition::Type conditionType,shared_ptr<DetectorValue> conditionVal)219 void WorkQueueManager::OnConditionChanged(WorkCondition::Type conditionType,
220     shared_ptr<DetectorValue> conditionVal)
221 {
222     auto service = wss_.lock();
223     if (!service) {
224         WS_HILOGE("service is null");
225         return;
226     }
227     auto task = [weak = weak_from_this(), service, conditionType, conditionVal]() {
228         auto strong = weak.lock();
229         if (!strong) {
230             WS_HILOGE("strong is null");
231             return;
232         }
233         vector<shared_ptr<WorkStatus>> readyWorkVector = strong->GetReayQueue(conditionType, conditionVal);
234         if (readyWorkVector.size() == 0) {
235             return;
236         }
237         for (auto it : readyWorkVector) {
238             it->MarkStatus(WorkStatus::Status::CONDITION_READY);
239         }
240         service->OnConditionReady(make_shared<vector<shared_ptr<WorkStatus>>>(readyWorkVector));
241     };
242     auto handler = service->GetHandler();
243     if (!handler) {
244         WS_HILOGE("handler is null");
245         return;
246     }
247     handler->PostTask(task);
248 }
249 
StopAndClearWorks(list<shared_ptr<WorkStatus>> workList)250 bool WorkQueueManager::StopAndClearWorks(list<shared_ptr<WorkStatus>> workList)
251 {
252     for (auto &it : workList) {
253         CancelWork(it);
254     }
255     return true;
256 }
257 
Dump(string & result)258 void WorkQueueManager::Dump(string& result)
259 {
260     std::lock_guard<ffrt::mutex> lock(mutex_);
261     string conditionType[] = {"network", "charger", "battery_status", "battery_level",
262         "storage", "timer", "group", "deepIdle", "standby", "unknown"};
263     uint32_t size = sizeof(conditionType) / sizeof(conditionType[0]);
264     for (auto it : queueMap_) {
265         if (it.first < size) {
266             result.append(conditionType[it.first]);
267         } else {
268             result.append(conditionType[size - 1]);
269         }
270         result.append(" : ");
271         result.append("[");
272         string workIdStr;
273         it.second->GetWorkIdStr(workIdStr);
274         result.append(workIdStr);
275         result.append("]\n");
276     }
277 }
278 
SetTimeCycle(uint32_t time)279 void WorkQueueManager::SetTimeCycle(uint32_t time)
280 {
281     timeCycle_ = time;
282     listenerMap_.at(WorkCondition::Type::TIMER)->Stop();
283     listenerMap_.at(WorkCondition::Type::TIMER)->Start();
284 }
285 
GetTimeCycle()286 uint32_t WorkQueueManager::GetTimeCycle()
287 {
288     return timeCycle_;
289 }
290 
SetTimeRetrigger(int32_t time)291 void WorkQueueManager::SetTimeRetrigger(int32_t time)
292 {
293     g_timeRetrigger = time;
294 }
295 
GetTimeRetrigger()296 int32_t WorkQueueManager::GetTimeRetrigger()
297 {
298     return g_timeRetrigger;
299 }
300 
SetMinIntervalByDump(int64_t interval)301 void WorkQueueManager::SetMinIntervalByDump(int64_t interval)
302 {
303     for (auto it : queueMap_) {
304         it.second->SetMinIntervalByDump(interval);
305     }
306 }
307 
AsyncStopWork(std::shared_ptr<WorkStatus> workStatus)308 void WorkQueueManager::AsyncStopWork(std::shared_ptr<WorkStatus> workStatus)
309 {
310     auto service = wss_.lock();
311     if (!service) {
312         WS_HILOGE("service is null");
313         return;
314     }
315     auto task = [weak = weak_from_this(), service, workStatus]() {
316         auto strong = weak.lock();
317         if (!strong) {
318             WS_HILOGE("strong is null");
319             return;
320         }
321         service->StopWorkInner(workStatus, workStatus->uid_, false, false);
322         workStatus->SetTimeout(false);
323     };
324     auto handler = service->GetHandler();
325     if (!handler) {
326         WS_HILOGE("handler is null");
327         return;
328     }
329     handler->PostTask(task);
330 }
331 } // namespace WorkScheduler
332 } // namespace OHOS