• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "async_runner.h"
16 
17 #include <cinttypes>
18 
19 #include "async_runner_manager.h"
20 #include "helper/error_helper.h"
21 #include "helper/napi_helper.h"
22 #include "task_manager.h"
23 #include "tools/log.h"
24 
25 namespace Commonlibrary::Concurrent::TaskPoolModule {
26 using namespace Commonlibrary::Concurrent::Common::Helper;
27 static constexpr char EXECUTE_STR[] = "execute";
28 
AsyncRunnerConstructor(napi_env env,napi_callback_info cbinfo)29 napi_value AsyncRunner::AsyncRunnerConstructor(napi_env env, napi_callback_info cbinfo)
30 {
31     size_t argc = 3; // 3 : name, runningCapacity, waitingCapacity
32     napi_value args[3];
33     napi_value thisVar = nullptr;
34     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
35     napi_value runningCapacity = nullptr;
36     napi_value name = nullptr;
37     napi_value waitingCapacity = nullptr;
38     if (argc == 3) { // 3: AsyncRunner(name, runningCapacity, waitingCapacity)
39         name = args[0];
40         runningCapacity = args[1]; // 1: the index of argument runningCapacity
41         waitingCapacity = args[2]; // 2: the index of argument waitingCapacity
42     } else if (argc == 2) { // 2: AsyncRunner(name, runningCapacity) or AsyncRunner(runningCapacity, waitingCapacity)
43         if (NapiHelper::IsString(env, args[0])) {
44             name = args[0];
45             runningCapacity = args[1];
46         } else {
47             runningCapacity = args[0];
48             waitingCapacity = args[1];
49         }
50     } else if (argc == 1) { // 1: AsyncRunner(runningCapacity)
51         runningCapacity = args[0];
52     } else {
53         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "The numbers of params not more than three or less one.");
54         return nullptr;
55     }
56     AsyncRunner* asyncRunner = CheckAndCreateAsyncRunner(env, thisVar, name, runningCapacity, waitingCapacity);
57     if (asyncRunner == nullptr) {
58         HILOG_ERROR("taskpool:: create asyncRunner failed.");
59         return nullptr;
60     }
61 
62     if (AsyncRunnerConstructorInner(env, thisVar, asyncRunner)) {
63         return thisVar;
64     }
65     return nullptr;
66 }
67 
AsyncRunnerConstructorInner(napi_env env,napi_value & thisVar,AsyncRunner * asyncRunner)68 bool AsyncRunner::AsyncRunnerConstructorInner(napi_env env, napi_value& thisVar, AsyncRunner* asyncRunner)
69 {
70     uint64_t asyncRunnerId = reinterpret_cast<uint64_t>(asyncRunner);
71     asyncRunner->asyncRunnerId_ = asyncRunnerId;
72     napi_value napiAsyncRunnerId = NapiHelper::CreateUint64(env, asyncRunnerId);
73     AsyncRunnerManager::GetInstance().StoreAsyncRunner(asyncRunnerId, asyncRunner);
74     napi_property_descriptor properties[] = {
75         DECLARE_NAPI_FUNCTION(EXECUTE_STR, Execute),
76     };
77     napi_define_properties(env, thisVar, sizeof(properties) / sizeof(properties[0]), properties);
78     HILOG_INFO("taskpool:: construct asyncRunner name is %{public}s, asyncRunnerId %{public}s.",
79                asyncRunner->name_.c_str(), std::to_string(asyncRunnerId).c_str());
80     napi_status status = napi_wrap(env, thisVar, asyncRunner, AsyncRunnerDestructor, nullptr, nullptr);
81     if (status != napi_ok) {
82         HILOG_ERROR("taskpool:: AsyncRunnerConstructorInner napi_wrap return value is %{public}d.", status);
83         AsyncRunnerDestructor(env, asyncRunner, nullptr);
84         return false;
85     }
86     return true;
87 }
88 
Execute(napi_env env,napi_callback_info cbinfo)89 napi_value AsyncRunner::Execute(napi_env env, napi_callback_info cbinfo)
90 {
91     size_t argc = 2; // 2 : task, Priority
92     napi_value args[2];
93     napi_value thisVar;
94     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
95     if (argc < 1 || argc > 2) { // 2 : task, Priority
96         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "The numbers of params not more than two or less one.");
97         return nullptr;
98     }
99     napi_value napiTask = args[0];
100     napi_value napiPriority = nullptr;
101     if (argc > 1) {
102         napiPriority = args[1];
103     }
104     if (!CheckExecuteArgs(env, napiTask, napiPriority)) {
105         return nullptr;
106     }
107 
108     AsyncRunner* asyncRunner = nullptr;
109     napi_unwrap(env, thisVar, reinterpret_cast<void**>(&asyncRunner));
110     if (asyncRunner == nullptr) {
111         return nullptr;
112     }
113     Task* task = nullptr;
114     napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
115     if (task == nullptr) {
116         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "The type of param must be task.");
117         return nullptr;
118     }
119     if (!task->CanForAsyncRunner(env)) {
120         return nullptr;
121     }
122     if (napiPriority != nullptr) {
123         uint32_t priority = NapiHelper::GetUint32Value(env, napiPriority);
124         task->asyncTaskPriority_ = static_cast<Priority>(priority);
125     }
126     task->asyncRunnerId_ = asyncRunner->asyncRunnerId_;
127     napi_value promise = task->GetTaskInfoPromise(env, napiTask, TaskType::ASYNCRUNNER_TASK, task->asyncTaskPriority_);
128     if (promise == nullptr) {
129         return nullptr;
130     }
131     if (!AsyncRunnerManager::GetInstance().FindRunnerAndRef(asyncRunner->asyncRunnerId_)) {
132         return nullptr;
133     }
134     if (!AddTasksToAsyncRunner(asyncRunner, task)) {
135         ExecuteTaskImmediately(asyncRunner, task);
136     }
137 
138     return promise;
139 }
140 
CheckAndCreateAsyncRunner(napi_env env,napi_value & thisVar,napi_value name,napi_value runningCapacity,napi_value waitingCapacity)141 AsyncRunner* AsyncRunner::CheckAndCreateAsyncRunner(napi_env env, napi_value& thisVar, napi_value name,
142                                                     napi_value runningCapacity, napi_value waitingCapacity)
143 {
144     std::string nameValue = "";
145     std::uint32_t runningCapacityVal = 0;
146     std::int32_t waitingCapacityValue = 0;
147     if (name != nullptr) {
148         if (!NapiHelper::IsString(env, name)) {
149             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "Name must be string.");
150             return nullptr;
151         }
152         nameValue = NapiHelper::GetString(env, name);
153     }
154     if (runningCapacity == nullptr) {
155         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "RunningCapacity cannot be empty.");
156         return nullptr;
157     }
158     if (!NapiHelper::IsNumber(env, runningCapacity)) {
159         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "RunningCapacity must be number.");
160         return nullptr;
161     }
162     runningCapacityVal = NapiHelper::GetUint32Value(env, runningCapacity);
163     if (runningCapacityVal < 1) {
164         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "RunningCapacity must be greater than zero.");
165         return nullptr;
166     }
167     if (waitingCapacity != nullptr) {
168         if (!NapiHelper::IsNumber(env, waitingCapacity)) {
169             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "WaitingCapacity must be number.");
170             return nullptr;
171         }
172         waitingCapacityValue = NapiHelper::GetInt32Value(env, waitingCapacity);
173         if (waitingCapacityValue < 0) {
174             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
175                 "WaitingCapacity must be greater than or equal zero.");
176             return nullptr;
177         }
178     }
179     AsyncRunner* asyncRunner = nullptr;
180     std::uint32_t waitingCapacityVal = static_cast<std::uint32_t>(waitingCapacityValue);
181     if (nameValue != "") {
182         asyncRunner = AsyncRunnerManager::GetInstance().CreateOrGetGlobalRunner(env, thisVar, nameValue,
183                                                                                 runningCapacityVal, waitingCapacityVal);
184     } else {
185         asyncRunner = new AsyncRunner();
186         asyncRunner->runningCapacity_ = runningCapacityVal;
187         asyncRunner->waitingCapacity_ = waitingCapacityVal;
188     }
189     return asyncRunner;
190 }
191 
CheckExecuteArgs(napi_env env,napi_value napiTask,napi_value napiPriority)192 bool AsyncRunner::CheckExecuteArgs(napi_env env, napi_value napiTask, napi_value napiPriority)
193 {
194     if (!NapiHelper::IsObject(env, napiTask) || !NapiHelper::HasNameProperty(env, napiTask, TASKID_STR)) {
195         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "First param must be task.");
196         return false;
197     }
198     if (napiPriority != nullptr) {
199         if (!NapiHelper::IsNumber(env, napiPriority)) {
200             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "The type of the second param must be number.");
201             return false;
202         }
203         uint32_t priority = NapiHelper::GetUint32Value(env, napiPriority);
204         if (priority >= Priority::NUMBER) {
205             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "Priority value is error.");
206             return false;
207         }
208     }
209     return true;
210 }
211 
ExecuteTaskImmediately(AsyncRunner * asyncRunner,Task * task)212 void AsyncRunner::ExecuteTaskImmediately(AsyncRunner* asyncRunner, Task* task)
213 {
214     HILOG_DEBUG("taskpool:: task %{public}s in asyncRunner %{public}s immediately.",
215                 std::to_string(task->taskId_).c_str(), std::to_string(asyncRunner->asyncRunnerId_).c_str());
216     task->IncreaseRefCount();
217     TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
218     task->taskState_ = ExecuteState::WAITING;
219     TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->asyncTaskPriority_);
220 }
221 
AddTasksToAsyncRunner(AsyncRunner * asyncRunner,Task * task)222 bool AsyncRunner::AddTasksToAsyncRunner(AsyncRunner* asyncRunner, Task* task)
223 {
224     Task* frontTask = nullptr;
225     {
226         std::unique_lock<std::shared_mutex> asyncRunnerLock(asyncRunner->waitingTasksMutex_);
227         if (asyncRunner->runningCount_ < asyncRunner->runningCapacity_) {
228             asyncRunner->runningCount_.fetch_add(1);
229             return false;
230         }
231         if (asyncRunner->waitingCapacity_ && asyncRunner->waitingTasks_.size() == asyncRunner->waitingCapacity_) {
232             frontTask = asyncRunner->waitingTasks_.front();
233             asyncRunner->waitingTasks_.pop_front();
234             frontTask->taskState_ = ExecuteState::CANCELED;
235         }
236         asyncRunner->waitingTasks_.push_back(task);
237     }
238 
239     if (frontTask != nullptr) {
240         asyncRunner->TriggerRejectErrorTimer(frontTask, ErrorHelper::ERR_ASYNCRUNNER_TASK_DISCARDED);
241     }
242     return true;
243 }
244 
AsyncRunnerDestructor(napi_env env,void * data,void * hint)245 void AsyncRunner::AsyncRunnerDestructor(napi_env env, void* data, [[maybe_unused]] void* hint)
246 {
247     AsyncRunner* asyncRunner = static_cast<AsyncRunner*>(data);
248     if (env == nullptr || asyncRunner == nullptr) {
249         return;
250     }
251     auto runner = AsyncRunnerManager::GetInstance().GetAsyncRunner(asyncRunner->asyncRunnerId_);
252     if (runner == nullptr) {
253         return;
254     }
255     AsyncRunnerManager::GetInstance().UnrefAndDestroyRunner(asyncRunner);
256 }
257 
RemoveWaitingTask(Task * task,bool isReject)258 bool AsyncRunner::RemoveWaitingTask(Task* task, bool isReject)
259 {
260     bool flag = false;
261     {
262         std::unique_lock<std::shared_mutex> lock(waitingTasksMutex_);
263         if (waitingTasks_.empty()) {
264             return flag;
265         }
266         auto iter = std::find(waitingTasks_.begin(), waitingTasks_.end(), task);
267         if (iter != waitingTasks_.end()) {
268             waitingTasks_.erase(iter);
269             flag = true;
270         }
271     }
272     if (flag) {
273         if (isReject) {
274             TriggerRejectErrorTimer(task, ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED);
275         } else {
276             AsyncRunnerManager::GetInstance().UnrefAndDestroyRunner(this);
277         }
278     }
279     return flag;
280 }
281 
TriggerRejectErrorTimer(Task * task,int32_t errCode,bool isWaiting)282 void AsyncRunner::TriggerRejectErrorTimer(Task* task, int32_t errCode, bool isWaiting)
283 {
284     if (task == nullptr || !task->IsValid()) {
285         return;
286     }
287     if (!isWaiting) {
288         AsyncRunnerManager::GetInstance().UnrefAndDestroyRunner(this);
289     }
290     DiscardTaskMessage* message = new DiscardTaskMessage(task->env_, task->taskId_, errCode, isWaiting);
291     task->DiscardAsyncRunnerTask(message);
292 }
293 
TriggerWaitingTask()294 void AsyncRunner::TriggerWaitingTask()
295 {
296     std::unique_lock<std::shared_mutex> lock(waitingTasksMutex_);
297     DecreaseRunningCount();
298     Task* task = nullptr;
299     while (runningCount_ < runningCapacity_) {
300         if (waitingTasks_.empty()) {
301             HILOG_DEBUG("taskpool:: asyncRunner %{public}s empty.", std::to_string(asyncRunnerId_).c_str());
302             break;
303         }
304         task = waitingTasks_.front();
305         waitingTasks_.pop_front();
306         runningCount_.fetch_add(1);
307         task->IncreaseRefCount();
308         TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
309         task->taskState_ = ExecuteState::WAITING;
310         HILOG_DEBUG("taskpool:: Trig task %{public}s in asyncRunner %{public}s.",
311                     std::to_string(task->taskId_).c_str(), std::to_string(asyncRunnerId_).c_str());
312         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->asyncTaskPriority_);
313     }
314 }
315 
CreateGlobalRunner(const std::string & name,uint32_t runningCapacity,uint32_t waitingCapacity)316 AsyncRunner* AsyncRunner::CreateGlobalRunner(const std::string& name, uint32_t runningCapacity,
317                                              uint32_t waitingCapacity)
318 {
319     AsyncRunner* asyncRunner = new AsyncRunner();
320     asyncRunner->waitingCapacity_ = waitingCapacity;
321     asyncRunner->runningCapacity_ = runningCapacity;
322     asyncRunner->isGlobalRunner_ = true;
323     asyncRunner->name_ = name;
324     return asyncRunner;
325 }
326 
DecreaseAsyncCount()327 uint64_t AsyncRunner::DecreaseAsyncCount()
328 {
329     if (refCount_ > 0) {
330         refCount_--;
331     }
332     return refCount_;
333 }
334 
IncreaseAsyncCount()335 void AsyncRunner::IncreaseAsyncCount()
336 {
337     refCount_++;
338 }
339 
CheckGlobalRunnerParams(napi_env env,uint32_t runningCapacity,uint32_t waitingCapacity)340 bool AsyncRunner::CheckGlobalRunnerParams(napi_env env, uint32_t runningCapacity, uint32_t waitingCapacity)
341 {
342     if (runningCapacity != runningCapacity_) {
343         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "AsyncRunner runningCapacity can not changed.");
344         return false;
345     }
346     if (waitingCapacity != waitingCapacity_) {
347         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "AsyncRunner waitingCapacity can not changed.");
348         return false;
349     }
350     return true;
351 }
352 
DecreaseRunningCount()353 void AsyncRunner::DecreaseRunningCount()
354 {
355     if (runningCount_ > 0) {
356         runningCount_.fetch_sub(1);
357     }
358 }
359 } // namespace Commonlibrary::Concurrent::TaskPoolModule