1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16
17 #include "sequence_runner_manager.h"
18
19 #include "helper/error_helper.h"
20 #include "helper/hitrace_helper.h"
21 #include "sequence_runner.h"
22 #include "task_manager.h"
23
24 namespace Commonlibrary::Concurrent::TaskPoolModule {
25 using namespace Commonlibrary::Concurrent::Common::Helper;
26
GetInstance()27 SequenceRunnerManager& SequenceRunnerManager::GetInstance()
28 {
29 static SequenceRunnerManager sequenceRunnerManager;
30 return sequenceRunnerManager;
31 }
32
CreateOrGetGlobalRunner(napi_env env,napi_value thisVar,size_t argc,const std::string & name,uint32_t priority)33 SequenceRunner* SequenceRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc,
34 const std::string& name, uint32_t priority)
35 {
36 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
37 SequenceRunner* seqRunner = nullptr;
38 auto iter = globalSeqRunner_.find(name);
39 if (iter == globalSeqRunner_.end()) {
40 Priority priorityVal = Priority::DEFAULT;
41 if (argc == 2) { // 2: The number of parameters is 2.
42 priorityVal = static_cast<Priority>(priority);
43 }
44 seqRunner = new SequenceRunner(priorityVal, name, true);
45 globalSeqRunner_.emplace(name, seqRunner);
46 } else {
47 seqRunner = iter->second;
48 if (priority != seqRunner->priority_) {
49 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: priority can not changed.");
50 return nullptr;
51 }
52 if (!FindRunnerAndRef(seqRunner->seqRunnerId_)) {
53 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: seqRunner not exist.");
54 return nullptr;
55 }
56 if (seqRunner->seqRunnerTasks_.empty()) {
57 seqRunner->currentTaskId_ = 0;
58 }
59 }
60
61 return seqRunner;
62 }
63
RemoveSequenceRunnerByName(const std::string & name)64 void SequenceRunnerManager::RemoveSequenceRunnerByName(const std::string& name)
65 {
66 std::unique_lock<std::mutex> globalLock(globalSeqRunnerMutex_);
67 auto iter = globalSeqRunner_.find(name.c_str());
68 if (iter != globalSeqRunner_.end()) {
69 globalSeqRunner_.erase(iter->first);
70 }
71 }
72
SequenceRunnerDestructor(SequenceRunner * seqRunner)73 void SequenceRunnerManager::SequenceRunnerDestructor(SequenceRunner* seqRunner)
74 {
75 auto runner = GetSeqRunner(seqRunner->seqRunnerId_);
76 if (runner == nullptr) {
77 return;
78 }
79 UnrefAndDestroyRunner(seqRunner);
80 }
81
StoreSequenceRunner(uint64_t seqRunnerId,SequenceRunner * seqRunner)82 void SequenceRunnerManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner)
83 {
84 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
85 seqRunners_.emplace(seqRunnerId, seqRunner);
86 }
87
RemoveSequenceRunner(uint64_t seqRunnerId)88 void SequenceRunnerManager::RemoveSequenceRunner(uint64_t seqRunnerId)
89 {
90 seqRunners_.erase(seqRunnerId);
91 }
92
GetSeqRunner(uint64_t seqRunnerId)93 SequenceRunner* SequenceRunnerManager::GetSeqRunner(uint64_t seqRunnerId)
94 {
95 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
96 auto iter = seqRunners_.find(seqRunnerId);
97 if (iter != seqRunners_.end()) {
98 return iter->second;
99 }
100 HILOG_DEBUG("taskpool:: sequenceRunner has been released.");
101 return nullptr;
102 }
103
AddTaskToSeqRunner(uint64_t seqRunnerId,Task * task)104 void SequenceRunnerManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task)
105 {
106 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
107 auto iter = seqRunners_.find(seqRunnerId);
108 if (iter == seqRunners_.end()) {
109 HILOG_ERROR("seqRunner:: seqRunner not found.");
110 return;
111 } else {
112 iter->second->AddTask(task);
113 }
114 }
115
TriggerSeqRunner(napi_env env,Task * lastTask)116 bool SequenceRunnerManager::TriggerSeqRunner(napi_env env, Task* lastTask)
117 {
118 uint64_t seqRunnerId = lastTask->seqRunnerId_;
119 SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId);
120 if (seqRunner == nullptr) {
121 HILOG_ERROR("taskpool:: trigger seqRunner not exist.");
122 return false;
123 }
124 if (UnrefAndDestroyRunner(seqRunner)) {
125 HILOG_ERROR("taskpool:: trigger seqRunner is removed.");
126 return false;
127 }
128 if (seqRunner->currentTaskId_ != lastTask->taskId_) {
129 HILOG_ERROR("taskpool:: only front task can trigger seqRunner.");
130 return false;
131 }
132 seqRunner->TriggerTask(env);
133 return true;
134 }
135
RemoveWaitingTask(Task * task)136 void SequenceRunnerManager::RemoveWaitingTask(Task* task)
137 {
138 auto seqRunner = GetSeqRunner(task->seqRunnerId_);
139 if (seqRunner == nullptr) {
140 return;
141 }
142 if (seqRunner->RemoveWaitingTask(task)) {
143 UnrefAndDestroyRunner(seqRunner);
144 }
145 }
146
FindRunnerAndRef(uint64_t seqRunnerId)147 bool SequenceRunnerManager::FindRunnerAndRef(uint64_t seqRunnerId)
148 {
149 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
150 auto iter = seqRunners_.find(seqRunnerId);
151 if (iter == seqRunners_.end()) {
152 HILOG_ERROR("taskpool:: seqRunner not exist.");
153 return false;
154 }
155 iter->second->IncreaseSeqCount();
156 return true;
157 }
158
UnrefAndDestroyRunner(SequenceRunner * seqRunner)159 bool SequenceRunnerManager::UnrefAndDestroyRunner(SequenceRunner* seqRunner)
160 {
161 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
162 if (seqRunner->DecreaseSeqCount() != 0) {
163 return false;
164 }
165 if (seqRunner->isGlobalRunner_) {
166 RemoveSequenceRunnerByName(seqRunner->seqName_);
167 }
168 RemoveSequenceRunner(seqRunner->seqRunnerId_);
169 delete seqRunner;
170 return true;
171 }
172 } // namespace Commonlibrary::Concurrent::TaskPoolModule