1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "sequence_runner.h"
16
17 #include "sequence_runner_manager.h"
18 #include "task_manager.h"
19
20 namespace Commonlibrary::Concurrent::TaskPoolModule {
21 using namespace Commonlibrary::Concurrent::Common::Helper;
22 static constexpr char EXECUTE_STR[] = "execute";
23 static constexpr char SEQ_RUNNER_ID_STR[] = "seqRunnerId";
24
SeqRunnerConstructorInner(napi_env env,napi_value & thisVar,SequenceRunner * seqRunner)25 bool SequenceRunner::SeqRunnerConstructorInner(napi_env env, napi_value& thisVar, SequenceRunner* seqRunner)
26 {
27 // update seqRunner.seqRunnerId
28 uint64_t seqRunnerId = reinterpret_cast<uint64_t>(seqRunner);
29 napi_value napiSeqRunnerId = NapiHelper::CreateUint64(env, seqRunnerId);
30 SequenceRunnerManager::GetInstance().StoreSequenceRunner(seqRunnerId, seqRunner);
31 napi_property_descriptor properties[] = {
32 DECLARE_NAPI_PROPERTY(SEQ_RUNNER_ID_STR, napiSeqRunnerId),
33 DECLARE_NAPI_FUNCTION(EXECUTE_STR, Execute),
34 };
35 napi_define_properties(env, thisVar, sizeof(properties) / sizeof(properties[0]), properties);
36 HILOG_INFO("taskpool:: construct seqRunner name is %{public}s, seqRunnerid %{public}s.",
37 seqRunner->seqName_.c_str(), std::to_string(seqRunnerId).c_str());
38
39 seqRunner->seqRunnerId_ = seqRunnerId;
40 napi_status status = napi_wrap(env, thisVar, seqRunner, SequenceRunnerDestructor, nullptr, nullptr);
41 if (status != napi_ok) {
42 HILOG_ERROR("taskpool::SeqRunnerConstructorInner napi_wrap return value is %{public}d", status);
43 SequenceRunnerDestructor(env, seqRunner, nullptr);
44 return false;
45 }
46 return true;
47 }
48
SeqRunnerConstructor(napi_env env,napi_callback_info cbinfo)49 napi_value SequenceRunner::SeqRunnerConstructor(napi_env env, napi_callback_info cbinfo)
50 {
51 // get input args out of env and cbinfo
52 size_t argc = 2; // 2: The maximum number of parameters is 2
53 napi_value args[2]; // 2: The maximum number of parameters is 2
54 napi_value thisVar;
55 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
56
57 uint32_t priority = Priority::DEFAULT;
58 std::string name = "";
59 if (argc == 2) { // 2: The number of parameters is 2, if the first is seqRunner name, the second must be priority
60 if (NapiHelper::IsString(env, args[0]) && NapiHelper::IsNumber(env, args[1])) {
61 name = NapiHelper::GetString(env, args[0]);
62 priority = NapiHelper::GetUint32Value(env, args[1]);
63 if (priority >= Priority::NUMBER) {
64 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value unvalied.");
65 return nullptr;
66 }
67 } else {
68 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
69 "the type of first param must be string and the type of second param must be string.");
70 return nullptr;
71 }
72 } else if (argc == 1) {
73 if (NapiHelper::IsString(env, args[0])) {
74 name = NapiHelper::GetString(env, args[0]);
75 } else if (NapiHelper::IsNumber(env, args[0])) {
76 priority = NapiHelper::GetUint32Value(env, args[0]);
77 if (priority >= Priority::NUMBER) {
78 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value unvalied.");
79 return nullptr;
80 }
81 } else {
82 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of first param must be string or number.");
83 return nullptr;
84 }
85 }
86
87 SequenceRunner* seqRunner = nullptr;
88 if (name != "") {
89 seqRunner = SequenceRunnerManager::GetInstance().CreateOrGetGlobalRunner(env, thisVar, argc, name, priority);
90 if (seqRunner == nullptr) {
91 HILOG_ERROR("taskpool:: create or get globalRunner failed");
92 return nullptr;
93 }
94 } else {
95 seqRunner = new SequenceRunner(static_cast<Priority>(priority));
96 }
97
98 if (!SeqRunnerConstructorInner(env, thisVar, seqRunner)) {
99 HILOG_ERROR("taskpool:: SeqRunnerConstructorInner failed");
100 return nullptr;
101 }
102 return thisVar;
103 }
104
Execute(napi_env env,napi_callback_info cbinfo)105 napi_value SequenceRunner::Execute(napi_env env, napi_callback_info cbinfo)
106 {
107 size_t argc = 1;
108 napi_value args[1];
109 napi_value thisVar;
110 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
111 std::string errMessage = "";
112 if (argc < 1) {
113 errMessage = "seqRunner:: number of params at least one";
114 HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
115 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of param at least one.");
116 return nullptr;
117 }
118 if (!NapiHelper::IsObject(env, args[0]) || !NapiHelper::HasNameProperty(env, args[0], TASKID_STR)) {
119 errMessage = "seqRunner:: first param must be task.";
120 HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
121 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be task.");
122 return nullptr;
123 }
124 napi_value napiSeqRunnerId = NapiHelper::GetNameProperty(env, thisVar, SEQ_RUNNER_ID_STR);
125 uint64_t seqRunnerId = NapiHelper::GetUint64Value(env, napiSeqRunnerId);
126 SequenceRunner* seqRunner = SequenceRunnerManager::GetInstance().GetSeqRunner(seqRunnerId);
127 if (seqRunner == nullptr) {
128 return nullptr;
129 }
130 Task* task = nullptr;
131 napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
132 if (task == nullptr) {
133 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of param must be task.");
134 return nullptr;
135 }
136 if (!task->CanForSequenceRunner(env)) {
137 return nullptr;
138 }
139 task->seqRunnerId_ = seqRunnerId;
140 napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::SEQRUNNER_TASK, seqRunner->priority_);
141 if (promise == nullptr) {
142 return nullptr;
143 }
144 if (!SequenceRunnerManager::GetInstance().FindRunnerAndRef(seqRunnerId)) {
145 return nullptr;
146 }
147 if (seqRunner->currentTaskId_ == 0) {
148 HILOG_INFO("taskpool:: taskId %{public}s in seqRunner %{public}s immediately.",
149 std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
150 seqRunner->currentTaskId_ = task->taskId_;
151 task->IncreaseRefCount();
152 task->taskState_ = ExecuteState::WAITING;
153 ExecuteTaskImmediately(task->taskId_, seqRunner->priority_);
154 } else {
155 HILOG_INFO("taskpool:: add taskId: %{public}s to seqRunner %{public}s.",
156 std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
157 SequenceRunnerManager::GetInstance().AddTaskToSeqRunner(seqRunnerId, task);
158 }
159 return promise;
160 }
161
ExecuteTaskImmediately(uint32_t taskId,Priority priority)162 void SequenceRunner::ExecuteTaskImmediately(uint32_t taskId, Priority priority)
163 {
164 TaskManager::GetInstance().EnqueueTaskId(taskId, priority);
165 }
166
SequenceRunnerDestructor(napi_env env,void * data,void * hint)167 void SequenceRunner::SequenceRunnerDestructor(napi_env env, void* data, [[maybe_unused]] void* hint)
168 {
169 SequenceRunner* seqRunner = static_cast<SequenceRunner*>(data);
170 SequenceRunnerManager::GetInstance().SequenceRunnerDestructor(seqRunner);
171 }
172
RemoveWaitingTask(Task * task)173 bool SequenceRunner::RemoveWaitingTask(Task* task)
174 {
175 std::unique_lock<std::shared_mutex> lock(seqRunnerMutex_);
176 if (seqRunnerTasks_.empty()) {
177 return false;
178 }
179 auto iter = std::find(seqRunnerTasks_.begin(), seqRunnerTasks_.end(), task);
180 if (iter != seqRunnerTasks_.end()) {
181 seqRunnerTasks_.erase(iter);
182 return true;
183 }
184 return false;
185 }
186
DecreaseSeqCount()187 uint64_t SequenceRunner::DecreaseSeqCount()
188 {
189 if (refCount_ > 0) {
190 refCount_--;
191 }
192 return refCount_;
193 }
194
IncreaseSeqCount()195 void SequenceRunner::IncreaseSeqCount()
196 {
197 refCount_++;
198 }
199
AddTask(Task * task)200 void SequenceRunner::AddTask(Task* task)
201 {
202 std::unique_lock<std::shared_mutex> seqLock(seqRunnerMutex_);
203 seqRunnerTasks_.push_back(task);
204 }
205
TriggerTask(napi_env env)206 void SequenceRunner::TriggerTask(napi_env env)
207 {
208 std::list<napi_deferred> deferreds {};
209 {
210 std::unique_lock<std::shared_mutex> lock(seqRunnerMutex_);
211 if (seqRunnerTasks_.empty()) {
212 currentTaskId_ = 0;
213 return;
214 }
215 Task* task = seqRunnerTasks_.front();
216 seqRunnerTasks_.pop_front();
217 bool isEmpty = false;
218 while (task->taskState_ == ExecuteState::CANCELED) {
219 if (refCount_ > 0) {
220 refCount_--;
221 }
222 if (task->currentTaskInfo_ != nullptr) {
223 deferreds.push_back(task->currentTaskInfo_->deferred);
224 }
225 if (task->IsSameEnv(env)) {
226 task->CancelInner(ExecuteState::CANCELED);
227 } else {
228 CancelTaskMessage* message = new CancelTaskMessage(ExecuteState::CANCELED, task->taskId_);
229 task->TriggerCancel(message);
230 }
231
232 if (seqRunnerTasks_.empty()) {
233 HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.",
234 std::to_string(seqRunnerId_).c_str());
235 currentTaskId_ = 0;
236 isEmpty = true;
237 break;
238 }
239 task = seqRunnerTasks_.front();
240 seqRunnerTasks_.pop_front();
241 }
242 if (!isEmpty) {
243 currentTaskId_ = task->taskId_;
244 task->IncreaseRefCount();
245 task->taskState_ = ExecuteState::WAITING;
246 HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.",
247 std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId_).c_str());
248 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority_);
249 }
250 }
251 TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, "taskpool:: sequenceRunner task has been canceled");
252 }
253 } // namespace Commonlibrary::Concurrent::TaskPoolModule