• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "async_runner_manager.h"
17 
18 #include <cinttypes>
19 
20 #include "helper/error_helper.h"
21 #include "task_manager.h"
22 #include "tools/log.h"
23 
24 namespace Commonlibrary::Concurrent::TaskPoolModule {
25 
GetInstance()26 AsyncRunnerManager& AsyncRunnerManager::GetInstance()
27 {
28     static AsyncRunnerManager asyncRunnerManager;
29     return asyncRunnerManager;
30 }
31 
CreateOrGetGlobalRunner(napi_env env,napi_value thisVar,const std::string & name,uint32_t runningCapacity,uint32_t waitingCapacity)32 AsyncRunner* AsyncRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, const std::string& name,
33                                                          uint32_t runningCapacity, uint32_t waitingCapacity)
34 {
35     AsyncRunner *asyncRunner = nullptr;
36     uint64_t asyncRunnerId = 0;
37     {
38         std::unique_lock<std::mutex> lock(globalAsyncRunnerMutex_);
39         auto iter = globalAsyncRunner_.find(name);
40         if (iter == globalAsyncRunner_.end()) {
41             asyncRunner = AsyncRunner::CreateGlobalRunner(name, runningCapacity, waitingCapacity);
42             globalAsyncRunner_.emplace(name, asyncRunner);
43             return asyncRunner;
44         } else {
45             asyncRunner = iter->second;
46             bool res = asyncRunner->CheckGlobalRunnerParams(env, runningCapacity, waitingCapacity);
47             if (!res) {
48                 return nullptr;
49             }
50             asyncRunnerId = asyncRunner->asyncRunnerId_;
51         }
52     }
53     if (!FindRunnerAndRef(asyncRunnerId)) {
54         return nullptr;
55     }
56 
57     return asyncRunner;
58 }
59 
StoreAsyncRunner(uint64_t asyncRunnerId,AsyncRunner * asyncRunner)60 void AsyncRunnerManager::StoreAsyncRunner(uint64_t asyncRunnerId, AsyncRunner* asyncRunner)
61 {
62     std::unique_lock<std::mutex> lock(asyncRunnersMutex_);
63     asyncRunners_.emplace(asyncRunnerId, asyncRunner);
64 }
65 
RemoveAsyncRunner(uint64_t asyncRunnerId)66 void AsyncRunnerManager::RemoveAsyncRunner(uint64_t asyncRunnerId)
67 {
68     asyncRunners_.erase(asyncRunnerId);
69 }
70 
GetAsyncRunner(uint64_t asyncRunnerId)71 AsyncRunner* AsyncRunnerManager::GetAsyncRunner(uint64_t asyncRunnerId)
72 {
73     std::unique_lock<std::mutex> lock(asyncRunnersMutex_);
74     auto iter = asyncRunners_.find(asyncRunnerId);
75     if (iter != asyncRunners_.end()) {
76         return iter->second;
77     }
78     return nullptr;
79 }
80 
TriggerAsyncRunner(napi_env env,Task * lastTask)81 bool AsyncRunnerManager::TriggerAsyncRunner(napi_env env, Task* lastTask)
82 {
83     uint64_t asyncRunnerId = lastTask->asyncRunnerId_;
84     AsyncRunner* asyncRunner = GetAsyncRunner(asyncRunnerId);
85     if (asyncRunner == nullptr) {
86         HILOG_ERROR("taskpool:: trigger asyncRunner not exist.");
87         return false;
88     }
89     if (UnrefAndDestroyRunner(asyncRunner)) {
90         HILOG_ERROR("taskpool:: trigger asyncRunner is remove.");
91         return false;
92     }
93     asyncRunner->TriggerWaitingTask();
94     return true;
95 }
96 
RemoveGlobalAsyncRunner(const std::string & name)97 void AsyncRunnerManager::RemoveGlobalAsyncRunner(const std::string& name)
98 {
99     std::unique_lock<std::mutex> lock(globalAsyncRunnerMutex_);
100     auto iter = globalAsyncRunner_.find(name);
101     if (iter != globalAsyncRunner_.end()) {
102         globalAsyncRunner_.erase(iter);
103     }
104 }
105 
CancelAsyncRunnerTask(napi_env env,Task * task)106 void AsyncRunnerManager::CancelAsyncRunnerTask(napi_env env, Task* task)
107 {
108     std::string errMsg = "";
109     ExecuteState state = ExecuteState::NOT_FOUND;
110     {
111         std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
112         if (task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) {
113             errMsg = "AsyncRunner task has been executed.";
114             HILOG_ERROR("taskpool:: %{public}s", errMsg.c_str());
115             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
116             return;
117         }
118 
119         state = task->taskState_.exchange(ExecuteState::CANCELED);
120     }
121     task->CancelPendingTask(env);
122     auto asyncRunner = GetAsyncRunner(task->asyncRunnerId_);
123     if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr &&
124         TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority)) {
125         task->DecreaseTaskLifecycleCount();
126         TaskManager::GetInstance().DecreaseSendDataRefCount(task->env_, task->taskId_);
127         if (asyncRunner != nullptr) {
128             asyncRunner->TriggerRejectErrorTimer(task, ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED, true);
129         }
130         TriggerAsyncRunner(env, task);
131     }
132 
133     if (asyncRunner != nullptr) {
134         asyncRunner->RemoveWaitingTask(task);
135     }
136 }
137 
RemoveWaitingTask(Task * task)138 void AsyncRunnerManager::RemoveWaitingTask(Task* task)
139 {
140     auto asyncRunner = GetAsyncRunner(task->asyncRunnerId_);
141     if (asyncRunner != nullptr) {
142         asyncRunner->RemoveWaitingTask(task, false);
143     }
144 }
145 
FindRunnerAndRef(uint64_t asyncRunnerId)146 bool AsyncRunnerManager::FindRunnerAndRef(uint64_t asyncRunnerId)
147 {
148     std::unique_lock<std::mutex> lock(asyncRunnersMutex_);
149     auto iter = asyncRunners_.find(asyncRunnerId);
150     if (iter == asyncRunners_.end()) {
151         HILOG_ERROR("taskpool:: asyncRunner not exist.");
152         return false;
153     }
154     iter->second->IncreaseAsyncCount();
155     return true;
156 }
157 
UnrefAndDestroyRunner(AsyncRunner * asyncRunner)158 bool AsyncRunnerManager::UnrefAndDestroyRunner(AsyncRunner* asyncRunner)
159 {
160     {
161         std::unique_lock<std::mutex> lock(asyncRunnersMutex_);
162         if (asyncRunner->DecreaseAsyncCount() != 0) {
163             return false;
164         }
165         RemoveAsyncRunner(asyncRunner->asyncRunnerId_);
166     }
167     if (asyncRunner->isGlobalRunner_) {
168         RemoveGlobalAsyncRunner(asyncRunner->name_);
169     }
170     delete asyncRunner;
171     asyncRunner = nullptr;
172     return true;
173 }
174 
DecreaseRunningCount(uint64_t asyncRunnerId)175 void AsyncRunnerManager::DecreaseRunningCount(uint64_t asyncRunnerId)
176 {
177     std::unique_lock<std::mutex> lock(asyncRunnersMutex_);
178     auto iter = asyncRunners_.find(asyncRunnerId);
179     if (iter == asyncRunners_.end()) {
180         return;
181     }
182     iter->second->DecreaseRunningCount();
183 }
184 } // namespace Commonlibrary::Concurrent::TaskPoolModule