• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 
17 #include "sequence_runner_manager.h"
18 
19 #include "helper/error_helper.h"
20 #include "helper/hitrace_helper.h"
21 #include "sequence_runner.h"
22 #include "task_manager.h"
23 
24 namespace Commonlibrary::Concurrent::TaskPoolModule {
25 using namespace Commonlibrary::Concurrent::Common::Helper;
26 
GetInstance()27 SequenceRunnerManager& SequenceRunnerManager::GetInstance()
28 {
29     static SequenceRunnerManager sequenceRunnerManager;
30     return sequenceRunnerManager;
31 }
32 
CreateOrGetGlobalRunner(napi_env env,napi_value thisVar,size_t argc,const std::string & name,uint32_t priority)33 SequenceRunner* SequenceRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc,
34                                                                const std::string& name, uint32_t priority)
35 {
36     std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
37     SequenceRunner* seqRunner = nullptr;
38     auto iter = globalSeqRunner_.find(name);
39     if (iter == globalSeqRunner_.end()) {
40         Priority priorityVal = Priority::DEFAULT;
41         if (argc == 2) { // 2: The number of parameters is 2.
42             priorityVal = static_cast<Priority>(priority);
43         }
44         seqRunner = new SequenceRunner(priorityVal, name, true);
45         globalSeqRunner_.emplace(name, seqRunner);
46     } else {
47         seqRunner = iter->second;
48         if (priority != seqRunner->priority_) {
49             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: priority can not changed.");
50             return nullptr;
51         }
52         if (!FindRunnerAndRef(seqRunner->seqRunnerId_)) {
53             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: seqRunner not exist.");
54             return nullptr;
55         }
56         if (seqRunner->seqRunnerTasks_.empty()) {
57             seqRunner->currentTaskId_ = 0;
58         }
59     }
60 
61     return seqRunner;
62 }
63 
RemoveSequenceRunnerByName(const std::string & name)64 void SequenceRunnerManager::RemoveSequenceRunnerByName(const std::string& name)
65 {
66     std::unique_lock<std::mutex> globalLock(globalSeqRunnerMutex_);
67     auto iter = globalSeqRunner_.find(name.c_str());
68     if (iter != globalSeqRunner_.end()) {
69         globalSeqRunner_.erase(iter->first);
70     }
71 }
72 
SequenceRunnerDestructor(SequenceRunner * seqRunner)73 void SequenceRunnerManager::SequenceRunnerDestructor(SequenceRunner* seqRunner)
74 {
75     auto runner = GetSeqRunner(seqRunner->seqRunnerId_);
76     if (runner == nullptr) {
77         return;
78     }
79     UnrefAndDestroyRunner(seqRunner);
80 }
81 
StoreSequenceRunner(uint64_t seqRunnerId,SequenceRunner * seqRunner)82 void SequenceRunnerManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner)
83 {
84     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
85     seqRunners_.emplace(seqRunnerId, seqRunner);
86 }
87 
RemoveSequenceRunner(uint64_t seqRunnerId)88 void SequenceRunnerManager::RemoveSequenceRunner(uint64_t seqRunnerId)
89 {
90     seqRunners_.erase(seqRunnerId);
91 }
92 
GetSeqRunner(uint64_t seqRunnerId)93 SequenceRunner* SequenceRunnerManager::GetSeqRunner(uint64_t seqRunnerId)
94 {
95     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
96     auto iter = seqRunners_.find(seqRunnerId);
97     if (iter != seqRunners_.end()) {
98         return iter->second;
99     }
100     HILOG_DEBUG("taskpool:: sequenceRunner has been released.");
101     return nullptr;
102 }
103 
AddTaskToSeqRunner(uint64_t seqRunnerId,Task * task)104 void SequenceRunnerManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task)
105 {
106     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
107     auto iter = seqRunners_.find(seqRunnerId);
108     if (iter == seqRunners_.end()) {
109         HILOG_ERROR("seqRunner:: seqRunner not found.");
110         return;
111     } else {
112         iter->second->AddTask(task);
113     }
114 }
115 
TriggerSeqRunner(napi_env env,Task * lastTask)116 bool SequenceRunnerManager::TriggerSeqRunner(napi_env env, Task* lastTask)
117 {
118     uint64_t seqRunnerId = lastTask->seqRunnerId_;
119     SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId);
120     if (seqRunner == nullptr) {
121         HILOG_ERROR("taskpool:: trigger seqRunner not exist.");
122         return false;
123     }
124     if (UnrefAndDestroyRunner(seqRunner)) {
125         HILOG_ERROR("taskpool:: trigger seqRunner is removed.");
126         return false;
127     }
128     if (seqRunner->currentTaskId_ != lastTask->taskId_) {
129         HILOG_ERROR("taskpool:: only front task can trigger seqRunner.");
130         return false;
131     }
132     seqRunner->TriggerTask(env);
133     return true;
134 }
135 
RemoveWaitingTask(Task * task)136 void SequenceRunnerManager::RemoveWaitingTask(Task* task)
137 {
138     auto seqRunner = GetSeqRunner(task->seqRunnerId_);
139     if (seqRunner == nullptr) {
140         return;
141     }
142     if (seqRunner->RemoveWaitingTask(task)) {
143         UnrefAndDestroyRunner(seqRunner);
144     }
145 }
146 
FindRunnerAndRef(uint64_t seqRunnerId)147 bool SequenceRunnerManager::FindRunnerAndRef(uint64_t seqRunnerId)
148 {
149     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
150     auto iter = seqRunners_.find(seqRunnerId);
151     if (iter == seqRunners_.end()) {
152         HILOG_ERROR("taskpool:: seqRunner not exist.");
153         return false;
154     }
155     iter->second->IncreaseSeqCount();
156     return true;
157 }
158 
UnrefAndDestroyRunner(SequenceRunner * seqRunner)159 bool SequenceRunnerManager::UnrefAndDestroyRunner(SequenceRunner* seqRunner)
160 {
161     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
162     if (seqRunner->DecreaseSeqCount() != 0) {
163         return false;
164     }
165     if (seqRunner->isGlobalRunner_) {
166         RemoveSequenceRunnerByName(seqRunner->seqName_);
167     }
168     RemoveSequenceRunner(seqRunner->seqRunnerId_);
169     delete seqRunner;
170     return true;
171 }
172 } // namespace Commonlibrary::Concurrent::TaskPoolModule