• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "sequence_runner.h"
16 
17 #include "sequence_runner_manager.h"
18 #include "task_manager.h"
19 
20 namespace Commonlibrary::Concurrent::TaskPoolModule {
21 using namespace Commonlibrary::Concurrent::Common::Helper;
22 static constexpr char EXECUTE_STR[] = "execute";
23 static constexpr char SEQ_RUNNER_ID_STR[] = "seqRunnerId";
24 
SeqRunnerConstructorInner(napi_env env,napi_value & thisVar,SequenceRunner * seqRunner)25 bool SequenceRunner::SeqRunnerConstructorInner(napi_env env, napi_value& thisVar, SequenceRunner* seqRunner)
26 {
27     // update seqRunner.seqRunnerId
28     uint64_t seqRunnerId = reinterpret_cast<uint64_t>(seqRunner);
29     napi_value napiSeqRunnerId = NapiHelper::CreateUint64(env, seqRunnerId);
30     SequenceRunnerManager::GetInstance().StoreSequenceRunner(seqRunnerId, seqRunner);
31     napi_property_descriptor properties[] = {
32         DECLARE_NAPI_PROPERTY(SEQ_RUNNER_ID_STR, napiSeqRunnerId),
33         DECLARE_NAPI_FUNCTION(EXECUTE_STR, Execute),
34     };
35     napi_define_properties(env, thisVar, sizeof(properties) / sizeof(properties[0]), properties);
36     HILOG_INFO("taskpool:: construct seqRunner name is %{public}s, seqRunnerid %{public}s.",
37                seqRunner->seqName_.c_str(), std::to_string(seqRunnerId).c_str());
38 
39     seqRunner->seqRunnerId_ = seqRunnerId;
40     napi_status status = napi_wrap(env, thisVar, seqRunner, SequenceRunnerDestructor, nullptr, nullptr);
41     if (status != napi_ok) {
42         HILOG_ERROR("taskpool::SeqRunnerConstructorInner napi_wrap return value is %{public}d", status);
43         SequenceRunnerDestructor(env, seqRunner, nullptr);
44         return false;
45     }
46     return true;
47 }
48 
SeqRunnerConstructor(napi_env env,napi_callback_info cbinfo)49 napi_value SequenceRunner::SeqRunnerConstructor(napi_env env, napi_callback_info cbinfo)
50 {
51     // get input args out of env and cbinfo
52     size_t argc = 2; // 2: The maximum number of parameters is 2
53     napi_value args[2]; // 2: The maximum number of parameters is 2
54     napi_value thisVar;
55     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
56 
57     uint32_t priority = Priority::DEFAULT;
58     std::string name = "";
59     if (argc == 2) { // 2: The number of parameters is 2, if the first is seqRunner name, the second must be priority
60         if (NapiHelper::IsString(env, args[0]) && NapiHelper::IsNumber(env, args[1])) {
61             name = NapiHelper::GetString(env, args[0]);
62             priority = NapiHelper::GetUint32Value(env, args[1]);
63             if (priority >= Priority::NUMBER) {
64                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value unvalied.");
65                 return nullptr;
66             }
67         } else {
68             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
69                 "the type of first param must be string and the type of second param must be string.");
70             return nullptr;
71         }
72     } else if (argc == 1) {
73         if (NapiHelper::IsString(env, args[0])) {
74             name = NapiHelper::GetString(env, args[0]);
75         } else if (NapiHelper::IsNumber(env, args[0])) {
76             priority = NapiHelper::GetUint32Value(env, args[0]);
77             if (priority >= Priority::NUMBER) {
78                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value unvalied.");
79                 return nullptr;
80             }
81         } else {
82             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of first param must be string or number.");
83             return nullptr;
84         }
85     }
86 
87     SequenceRunner* seqRunner = nullptr;
88     if (name != "") {
89         seqRunner = SequenceRunnerManager::GetInstance().CreateOrGetGlobalRunner(env, thisVar, argc, name, priority);
90         if (seqRunner == nullptr) {
91             HILOG_ERROR("taskpool:: create or get globalRunner failed");
92             return nullptr;
93         }
94     } else {
95         seqRunner = new SequenceRunner(static_cast<Priority>(priority));
96     }
97 
98     if (!SeqRunnerConstructorInner(env, thisVar, seqRunner)) {
99         HILOG_ERROR("taskpool:: SeqRunnerConstructorInner failed");
100         return nullptr;
101     }
102     return thisVar;
103 }
104 
Execute(napi_env env,napi_callback_info cbinfo)105 napi_value SequenceRunner::Execute(napi_env env, napi_callback_info cbinfo)
106 {
107     size_t argc = 1;
108     napi_value args[1];
109     napi_value thisVar;
110     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
111     std::string errMessage = "";
112     if (argc < 1) {
113         errMessage = "seqRunner:: number of params at least one";
114         HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
115         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of param at least one.");
116         return nullptr;
117     }
118     if (!NapiHelper::IsObject(env, args[0]) || !NapiHelper::HasNameProperty(env, args[0], TASKID_STR)) {
119         errMessage = "seqRunner:: first param must be task.";
120         HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
121         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be task.");
122         return nullptr;
123     }
124     napi_value napiSeqRunnerId = NapiHelper::GetNameProperty(env, thisVar, SEQ_RUNNER_ID_STR);
125     uint64_t seqRunnerId = NapiHelper::GetUint64Value(env, napiSeqRunnerId);
126     SequenceRunner* seqRunner = SequenceRunnerManager::GetInstance().GetSeqRunner(seqRunnerId);
127     if (seqRunner == nullptr) {
128         return nullptr;
129     }
130     Task* task = nullptr;
131     napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
132     if (task == nullptr) {
133         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of param must be task.");
134         return nullptr;
135     }
136     if (!task->CanForSequenceRunner(env)) {
137         return nullptr;
138     }
139     task->seqRunnerId_ = seqRunnerId;
140     napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::SEQRUNNER_TASK, seqRunner->priority_);
141     if (promise == nullptr) {
142         return nullptr;
143     }
144     if (!SequenceRunnerManager::GetInstance().FindRunnerAndRef(seqRunnerId)) {
145         return nullptr;
146     }
147     if (seqRunner->currentTaskId_ == 0) {
148         HILOG_INFO("taskpool:: taskId %{public}s in seqRunner %{public}s immediately.",
149                    std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
150         seqRunner->currentTaskId_ = task->taskId_;
151         task->IncreaseRefCount();
152         task->taskState_ = ExecuteState::WAITING;
153         ExecuteTaskImmediately(task->taskId_, seqRunner->priority_);
154     } else {
155         HILOG_INFO("taskpool:: add taskId: %{public}s to seqRunner %{public}s.",
156                    std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
157         SequenceRunnerManager::GetInstance().AddTaskToSeqRunner(seqRunnerId, task);
158     }
159     return promise;
160 }
161 
ExecuteTaskImmediately(uint32_t taskId,Priority priority)162 void SequenceRunner::ExecuteTaskImmediately(uint32_t taskId, Priority priority)
163 {
164     TaskManager::GetInstance().EnqueueTaskId(taskId, priority);
165 }
166 
SequenceRunnerDestructor(napi_env env,void * data,void * hint)167 void SequenceRunner::SequenceRunnerDestructor(napi_env env, void* data, [[maybe_unused]] void* hint)
168 {
169     SequenceRunner* seqRunner = static_cast<SequenceRunner*>(data);
170     SequenceRunnerManager::GetInstance().SequenceRunnerDestructor(seqRunner);
171 }
172 
RemoveWaitingTask(Task * task)173 bool SequenceRunner::RemoveWaitingTask(Task* task)
174 {
175     std::unique_lock<std::shared_mutex> lock(seqRunnerMutex_);
176     if (seqRunnerTasks_.empty()) {
177         return false;
178     }
179     auto iter = std::find(seqRunnerTasks_.begin(), seqRunnerTasks_.end(), task);
180     if (iter != seqRunnerTasks_.end()) {
181         seqRunnerTasks_.erase(iter);
182         return true;
183     }
184     return false;
185 }
186 
DecreaseSeqCount()187 uint64_t SequenceRunner::DecreaseSeqCount()
188 {
189     if (refCount_ > 0) {
190         refCount_--;
191     }
192     return refCount_;
193 }
194 
IncreaseSeqCount()195 void SequenceRunner::IncreaseSeqCount()
196 {
197     refCount_++;
198 }
199 
AddTask(Task * task)200 void SequenceRunner::AddTask(Task* task)
201 {
202     std::unique_lock<std::shared_mutex> seqLock(seqRunnerMutex_);
203     seqRunnerTasks_.push_back(task);
204 }
205 
TriggerTask(napi_env env)206 void SequenceRunner::TriggerTask(napi_env env)
207 {
208     std::list<napi_deferred> deferreds {};
209     {
210         std::unique_lock<std::shared_mutex> lock(seqRunnerMutex_);
211         if (seqRunnerTasks_.empty()) {
212             currentTaskId_ = 0;
213             return;
214         }
215         Task* task = seqRunnerTasks_.front();
216         seqRunnerTasks_.pop_front();
217         bool isEmpty = false;
218         while (task->taskState_ == ExecuteState::CANCELED) {
219             if (refCount_ > 0) {
220                 refCount_--;
221             }
222             if (task->currentTaskInfo_ != nullptr) {
223                 deferreds.push_back(task->currentTaskInfo_->deferred);
224             }
225             if (task->IsSameEnv(env)) {
226                 task->CancelInner(ExecuteState::CANCELED);
227             } else {
228                 CancelTaskMessage* message = new CancelTaskMessage(ExecuteState::CANCELED, task->taskId_);
229                 task->TriggerCancel(message);
230             }
231 
232             if (seqRunnerTasks_.empty()) {
233                 HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.",
234                             std::to_string(seqRunnerId_).c_str());
235                 currentTaskId_ = 0;
236                 isEmpty = true;
237                 break;
238             }
239             task = seqRunnerTasks_.front();
240             seqRunnerTasks_.pop_front();
241         }
242         if (!isEmpty) {
243             currentTaskId_ = task->taskId_;
244             task->IncreaseRefCount();
245             task->taskState_ = ExecuteState::WAITING;
246             HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.",
247                         std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId_).c_str());
248             TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority_);
249         }
250     }
251     TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, "taskpool:: sequenceRunner task has been canceled");
252 }
253 } // namespace Commonlibrary::Concurrent::TaskPoolModule