1 /*
2 * Copyright (C) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "nodejstaskqueue.h"
17
18 #include <chrono>
19 #include <deque>
20 #include <meta/ext/object.h>
21 #include <meta/ext/task_queue.h>
22 #include <meta/interface/intf_task_queue.h>
23 #include <meta/interface/intf_task_queue_registry.h>
24 #include <mutex>
25 #include <thread>
26
27 using namespace META_NS;
28 using Token = ITaskQueue::Token;
29
30 class NodeJSTaskQueue : public IntroduceInterfaces<MetaObject, ITaskQueue, INodeJSTaskQueue> {
31 META_OBJECT(NodeJSTaskQueue, ::ClassId::NodeJSTaskQueue, IntroduceInterfaces)
32 public:
33 META_NO_COPY_MOVE(NodeJSTaskQueue)
34
35 NodeJSTaskQueue();
36 ~NodeJSTaskQueue() override;
37
38 private:
39 // ITaskQueue
40 Token AddTask(ITaskQueueTask::Ptr p) override;
41 Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay) override;
42 IFuture::Ptr AddWaitableTask(ITaskQueueWaitableTask::Ptr p) override;
43 void CancelTask(Token token) override;
44
45 // INodeJSTaskQueue
46 napi_env GetNapiEnv() const override;
47 bool Acquire() override;
48 bool Release() override;
49 bool IsReleased() override;
50
51 // ILifecycle
52 bool Build(const IMetadata::Ptr& data) override;
53
54 private:
55 static napi_value Run(napi_env env, napi_callback_info info);
56 static void Cleanup(napi_env env, void* finalize_data, void* context);
57 static void Invoke(napi_env env, napi_value js_callback, void* context, void* data);
58
59 void SetTimeout(napi_env env, uint32_t trigger);
60 void CancelTimeout(napi_env env);
61
62 void Schedule(napi_env env, std::unique_lock<std::recursive_mutex>& lock);
63 bool RescheduleTimer();
64
65 bool IsInQueue();
66
67 Token MakeToken(const ITaskQueueTask::Ptr p);
68 void AddTaskImpl(Token ret, ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime);
69 Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime);
70
71 std::chrono::high_resolution_clock::duration Now() const;
72 TimeSpan TimeFloor() const;
73 TimeSpan TimeCeil() const;
74
75 struct Task {
76 Task() = default;
TaskNodeJSTaskQueue::Task77 Task(Token t, TimeSpan d, TimeSpan e, const ITaskQueueTask::Ptr& p)
78 : token(t), delay(d), executeTime(e), operation(p)
79 {}
80
81 Token token { 0 };
82 TimeSpan delay;
83 TimeSpan executeTime;
84 ITaskQueueTask::Ptr operation { nullptr };
85 };
86
87 std::chrono::high_resolution_clock::duration epoch_ { 0 };
88 napi_env env_ { nullptr };
89 napi_threadsafe_function tsf_ { nullptr };
90 NapiApi::StrongRef curTimeout_;
91 NapiApi::StrongRef RunFunc_;
92
93 std::recursive_mutex mutex_;
94 std::thread::id execThread_;
95 // currently running task..
96 Token execToken_ { nullptr };
97 std::deque<Task> tasks_;
98 bool scheduled_ = false;
99 uint32_t refcnt_ { 0 };
100 };
101
NodeJSTaskQueue()102 NodeJSTaskQueue::NodeJSTaskQueue() {}
103
~NodeJSTaskQueue()104 NodeJSTaskQueue::~NodeJSTaskQueue()
105 {
106 assert(tasks_.empty());
107 assert(refcnt_ == 0);
108 assert(curTimeout_.IsEmpty());
109 assert(RunFunc_.IsEmpty());
110 assert(tsf_ == nullptr);
111 assert(!scheduled_);
112 }
113
Acquire()114 bool NodeJSTaskQueue::Acquire()
115 {
116 std::unique_lock lock { mutex_ };
117 assert(execThread_ == std::this_thread::get_id());
118 if (execThread_ != std::this_thread::get_id()) {
119 // called from incorrect thread. do nothing.
120 return false;
121 }
122 if (refcnt_ == 0) {
123 if (tsf_ == nullptr) {
124 napi_value func;
125 napi_create_function(env_, "NodeJsTaskQueueRun", 0, Run, this, &func);
126 RunFunc_ = { env_, func };
127
128 napi_value name;
129 napi_create_string_latin1(env_, "NodeJSTaskQueue", 1, &name);
130 napi_create_threadsafe_function(env_, nullptr, nullptr, name, 0, 1, nullptr, Cleanup, this, Invoke, &tsf_);
131 }
132 }
133 refcnt_++;
134 return true;
135 }
136
Release()137 bool NodeJSTaskQueue::Release()
138 {
139 std::unique_lock lock { mutex_ };
140 assert(execThread_ == std::this_thread::get_id());
141 if (execThread_ != std::this_thread::get_id()) {
142 // called from incorrect thread. do nothing.
143 return false;
144 }
145 if (refcnt_ == 0) {
146 // already released.
147 return false;
148 }
149 if (refcnt_ == 1) {
150 refcnt_ = 0;
151 // fully released, we may be allowed to release the resources
152 if (tasks_.empty()) {
153 // queue empty, we can finalize.
154 auto tsf = tsf_;
155 tsf_ = nullptr;
156 RunFunc_.Reset();
157 curTimeout_.Reset();
158 lock.unlock();
159 napi_release_threadsafe_function(tsf, napi_threadsafe_function_release_mode::napi_tsfn_abort);
160 }
161 return true;
162 }
163 refcnt_--;
164 return true;
165 }
166
IsReleased()167 bool NodeJSTaskQueue::IsReleased()
168 {
169 std::unique_lock lock { mutex_ };
170 assert(execThread_ == std::this_thread::get_id());
171 if (execThread_ != std::this_thread::get_id()) {
172 // called from incorrect thread.
173 return false;
174 }
175 if (refcnt_ > 0) {
176 return false;
177 }
178 if (!tasks_.empty()) {
179 return false;
180 }
181 if (tsf_) {
182 return false;
183 }
184 if (scheduled_) {
185 // still busy..
186 return false;
187 }
188 return true;
189 }
190
Build(const IMetadata::Ptr & data)191 bool NodeJSTaskQueue::Build(const IMetadata::Ptr& data)
192 {
193 if (!Super::Build(data)) {
194 return false;
195 }
196 if (auto prp = data->GetProperty<uintptr_t>("env")) {
197 env_ = (napi_env)prp->GetValue();
198 } else {
199 return false;
200 }
201 execThread_ = std::this_thread::get_id();
202 epoch_ = std::chrono::high_resolution_clock::now().time_since_epoch();
203
204 // make the NodeJSTaskQueue current for this thread.
205 // setting it here, makes sure that GetCurrentTaskQueue could be used to identify the thread.
206 // unless someone runs other taskqueues in js thread.. hmm
207 GetTaskQueueRegistry().SetCurrentTaskQueue(GetSelf<ITaskQueue>());
208
209 return true;
210 }
211
GetNapiEnv() const212 napi_env NodeJSTaskQueue::GetNapiEnv() const
213 {
214 return env_;
215 }
Run(napi_env env,napi_callback_info info)216 napi_value NodeJSTaskQueue::Run(napi_env env, napi_callback_info info)
217 {
218 NapiApi::FunctionContext fc(env, info);
219 auto me = static_cast<NodeJSTaskQueue*>(fc.GetData());
220 std::unique_lock lock { me->mutex_ };
221 me->curTimeout_.Reset();
222 me->Schedule(env, lock);
223 return {};
224 }
SetTimeout(napi_env env,uint32_t trigger)225 void NodeJSTaskQueue::SetTimeout(napi_env env, uint32_t trigger)
226 {
227 NapiApi::Env e(env);
228 napi_value global;
229 napi_get_global(env, &global);
230 NapiApi::Object g(e, global);
231 NapiApi::Function func = g.Get<NapiApi::Function>("setTimeout");
232
233 napi_value args[2];
234 args[0] = RunFunc_.GetValue();
235 args[1] = e.GetNumber(trigger);
236 napi_value res { nullptr };
237 res = func.Invoke(g, 2, args); // 2: arg num
238 curTimeout_ = { e, res };
239 }
CancelTimeout(napi_env env)240 void NodeJSTaskQueue::CancelTimeout(napi_env env)
241 {
242 if (curTimeout_.IsEmpty()) {
243 return;
244 }
245 // cancel previous timeout
246 NapiApi::Env e(env);
247 napi_value global;
248 napi_get_global(env, &global);
249 NapiApi::Object g(e, global);
250 NapiApi::Function func = g.Get<NapiApi::Function>("clearTimeout");
251
252 napi_value args[1];
253 args[0] = curTimeout_.GetValue();
254 func.Invoke(g, 1, args);
255 curTimeout_.Reset();
256 }
Schedule(napi_env env,std::unique_lock<std::recursive_mutex> & lock)257 void NodeJSTaskQueue::Schedule(napi_env env, std::unique_lock<std::recursive_mutex>& lock)
258 {
259 // NOTE: lock MUST be in locked state when entering here.
260 assert(lock.owns_lock());
261
262 // running in javascript side.
263 BASE_NS::vector<Task> rearm;
264 auto curTime = TimeFloor(); // round down the time..
265 // 1. see how long until the first scheduled task. (looping through the queue ,executing tasks if needed)
266 for (;;) {
267 if (tasks_.empty()) {
268 // no tasks to execute.
269 break;
270 }
271 auto front = tasks_.back();
272 // 2. if task should have executed already, invoke it.
273 if (curTime >= front.executeTime) {
274 // execute
275 tasks_.pop_back();
276 execToken_ = front.token;
277 lock.unlock();
278 auto q = GetTaskQueueRegistry().SetCurrentTaskQueue(GetSelf<ITaskQueue>());
279 auto ret = front.operation->Invoke();
280 GetTaskQueueRegistry().SetCurrentTaskQueue(q);
281 if (!ret) {
282 // if the task is to be terminated, do it here.
283 front = {};
284 }
285 lock.lock();
286 if (ret && execToken_) {
287 rearm.emplace_back(front.token, front.delay, front.executeTime, BASE_NS::move(front.operation));
288 }
289 execToken_ = { 0 };
290 continue;
291 }
292 break;
293 }
294 // cancel timeout if there is one
295 CancelTimeout(env);
296
297 if (!rearm.empty()) {
298 // just add them..
299 for (auto t : rearm) {
300 // calculate the next executeTime in phase.. (ie. how many events missed)
301 const auto dt = t.delay;
302 const auto ct = curTime;
303 if (dt > TimeSpan::Zero()) {
304 auto et = t.executeTime;
305 // calculate the next executeTime in phase..
306 auto ticks = ((ct - et) + dt) / dt;
307 // and based on the "ticks" we can now count the next execution time.
308 et += (ticks * dt);
309 AddTaskImpl(t.token, t.operation, dt, et);
310 } else {
311 // re-scheduling zero delay ones..
312 // so zero delay, and now.
313 AddTaskImpl(t.token, t.operation, TimeSpan::Zero(), ct);
314 }
315 }
316 }
317
318 if (!tasks_.empty()) {
319 // 3. schedule a timer for remaining tasks. (using the front most time)
320 const auto& front = tasks_.back();
321 int64_t delayMS = (front.executeTime - curTime).ToMilliseconds();
322 // clamp to at most 10 seconds of wait and at minimum 0 milliseconds.
323 uint32_t trigger = BASE_NS::Math::max(int64_t(0), BASE_NS::Math::min(int64_t(10000), delayMS));
324 SetTimeout(env, trigger);
325 }
326
327 if (tasks_.empty()) {
328 // queue empty, we may be allowed to release resourcec.
329 if (refcnt_ == 0) {
330 if (tsf_) {
331 // release the TSF, we have given permission to terminate.
332 auto tsf = tsf_;
333 tsf_ = nullptr;
334 napi_release_threadsafe_function(tsf, napi_threadsafe_function_release_mode::napi_tsfn_abort);
335 }
336 RunFunc_.Reset();
337 }
338 }
339 }
340
RescheduleTimer()341 bool NodeJSTaskQueue::RescheduleTimer()
342 {
343 std::unique_lock lock { mutex_ };
344 if (IsInQueue()) {
345 // call directly as we are in JS thread.
346 Schedule(env_, lock);
347 return true;
348 }
349 if (!tsf_) {
350 // tsf does not exist anymore, so stop scheduling.
351 return false;
352 }
353 if (!scheduled_) {
354 // TSF not queued yet.
355 // use tsf to schedule
356 auto res = napi_acquire_threadsafe_function(tsf_);
357 if (res != napi_ok) {
358 // could not acquire the function anymore.
359 // (most likely cleanup already in progress)
360 // fail.
361 return false;
362 }
363 void* data = nullptr;
364 scheduled_ = true;
365 res = napi_call_threadsafe_function(tsf_, data, napi_threadsafe_function_call_mode::napi_tsfn_blocking);
366 if (res == napi_closing) {
367 // clean up in progress.
368 return false;
369 }
370 res = napi_release_threadsafe_function(tsf_, napi_threadsafe_function_release_mode::napi_tsfn_release);
371 if (res != napi_ok) {
372 return false;
373 }
374 }
375 return true;
376 }
377
Cleanup(napi_env env,void * finalize_data,void * context)378 void NodeJSTaskQueue::Cleanup(napi_env env, void* finalize_data, void* context)
379 {
380 auto* me = static_cast<NodeJSTaskQueue*>(context);
381 // we actually do not need to do anything here now.
382 }
Invoke(napi_env env,napi_value js_callback,void * context,void * data)383 void NodeJSTaskQueue::Invoke(napi_env env, napi_value js_callback, void* context, void* data)
384 {
385 auto* me = static_cast<NodeJSTaskQueue*>(context);
386 if (me) {
387 std::unique_lock lock { me->mutex_ };
388 me->scheduled_ = false;
389 me->Schedule(env, lock);
390 }
391 }
IsInQueue()392 bool NodeJSTaskQueue::IsInQueue()
393 {
394 auto tmp = GetTaskQueueRegistry().GetCurrentTaskQueue();
395 if (!tmp) {
396 return false;
397 }
398 // this is the correct way to compare random objects.
399 auto i1 = tmp->GetInterface(CORE_NS::IInterface::UID);
400 auto i2 = GetInterface(CORE_NS::IInterface::UID);
401 return (i1 == i2);
402 }
CancelTask(Token token)403 void NodeJSTaskQueue::CancelTask(Token token)
404 {
405 if (!token) {
406 // invalid parameter.
407 return;
408 }
409 bool sameThread = false;
410 auto curThread = std::this_thread::get_id();
411 if (IsInQueue()) {
412 // currently executing tasks in this thread.
413 sameThread = true;
414 }
415 if (curThread != execThread_) {
416 sameThread = true;
417 }
418 std::unique_lock lock { mutex_ };
419 Token executingToken = execToken_;
420 if (token == execToken_) {
421 // Currently executing task is requested to cancel.
422 // Tasks are temporarily removed from the queue while execution, so the currently running task is not in
423 // the queue anymore. Setting execToken_ to null will cause the task to not be re-added.
424 execToken_ = nullptr;
425 }
426
427 // If we are currently executing the task in different thread, wait for it to complete.
428 // ie. called OUTSIDE the queue.
429 if (!sameThread) {
430 while (token == executingToken) {
431 lock.unlock();
432 // sleep a bit.
433 std::this_thread::yield();
434 lock.lock();
435 executingToken = execToken_;
436 }
437 }
438
439 // Remove all tasks from the queue, with the same token. (if any)
440 // One can push the same task to the queue multiple times currently.
441 // (ie. you "can" schedule the same task with different "delays")
442 // So we remove all scheduled tasks with same token.
443 // Also redo/rearm might have add the task back while we were waiting/yielding.
444 for (auto it = tasks_.begin(); it != tasks_.end();) {
445 if (it->token == token) {
446 it = tasks_.erase(it);
447 } else {
448 it++;
449 }
450 }
451 // trigger js side to schedule the tasks!
452 RescheduleTimer();
453 }
454
AddTaskImpl(Token ret,ITaskQueueTask::Ptr p,const TimeSpan & delay,const TimeSpan & excTime)455 void NodeJSTaskQueue::AddTaskImpl(Token ret, ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
456 {
457 if (auto i = interface_cast<ITaskScheduleInfo>(p)) {
458 i->SetQueueAndToken(GetSelf<ITaskQueue>(), ret);
459 }
460
461 // insertion sort the tasks
462 //
463 // early out trivial cases (empty list, insert first, insert last..)
464 if (tasks_.empty()) {
465 tasks_.emplace_back(ret, delay, excTime, BASE_NS::move(p));
466 } else if (excTime <= tasks_.back().executeTime) {
467 // new task should execute before the current first one.
468 tasks_.emplace_back(ret, delay, excTime, BASE_NS::move(p));
469 } else if (excTime > tasks_.front().executeTime) {
470 // new task should execute after the current last one.
471 tasks_.emplace_front(ret, delay, excTime, BASE_NS::move(p));
472 } else {
473 // finally resort to linear search..
474 // (could use a smarter search but expect that there are not too many tasks in queue)
475 for (auto it = tasks_.begin(); it != tasks_.end(); ++it) {
476 if (it->executeTime <= excTime) {
477 // task in list should execute after us, so insert there.
478 tasks_.insert(it, { ret, delay, excTime, BASE_NS::move(p) });
479 break;
480 }
481 }
482 }
483 }
MakeToken(const ITaskQueueTask::Ptr p)484 Token NodeJSTaskQueue::MakeToken(const ITaskQueueTask::Ptr p)
485 {
486 // use the old cheap "task raw pointer as token" method.
487 return p.get();
488 }
AddTask(ITaskQueueTask::Ptr p,const TimeSpan & delay,const TimeSpan & excTime)489 Token NodeJSTaskQueue::AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
490 {
491 if (!p) {
492 return nullptr;
493 }
494 std::unique_lock lock { mutex_ };
495 // unique
496 Token result = MakeToken(p);
497 AddTaskImpl(result, BASE_NS::move(p), delay, excTime);
498
499 // trigger js side to schedule the tasks!
500 if (!RescheduleTimer()) {
501 // can not schedule tasks anymore.
502 // remove the task we TRIED to add..
503 for (auto it = tasks_.begin(); it != tasks_.end(); it++) {
504 if ((it->token = result) && (it->executeTime == excTime) && (it->delay == delay)) {
505 tasks_.erase(it);
506 break;
507 }
508 }
509 result = { nullptr };
510 }
511 return result;
512 }
513
AddTask(ITaskQueueTask::Ptr p)514 Token NodeJSTaskQueue::AddTask(ITaskQueueTask::Ptr p)
515 {
516 return AddTask(BASE_NS::move(p), TimeSpan::Milliseconds(0));
517 }
518
AddTask(ITaskQueueTask::Ptr p,const TimeSpan & delay)519 Token NodeJSTaskQueue::AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay)
520 {
521 return AddTask(BASE_NS::move(p), delay, TimeCeil() + delay);
522 }
523
AddWaitableTask(ITaskQueueWaitableTask::Ptr p)524 IFuture::Ptr NodeJSTaskQueue::AddWaitableTask(ITaskQueueWaitableTask::Ptr p)
525 {
526 if (IsInQueue()) {
527 // warning! this may cause a deadlock.
528 LOG_W("Adding a waitable task to the same thread! Wait may never complete!");
529 }
530
531 auto promise = GetObjectRegistry().Create<IPromise>(META_NS::ClassId::Promise);
532 BASE_NS::shared_ptr<PromisedQueueTask> task(new PromisedQueueTask(p, move(promise)));
533 auto f = task->GetFuture();
534 if (!AddTask(BASE_NS::move(task))) {
535 // could not schedule the task.
536 // so the promise is abandoned.
537 promise->SetAbandoned();
538 }
539 return f;
540 }
541
Now() const542 std::chrono::high_resolution_clock::duration NodeJSTaskQueue::Now() const
543 {
544 using namespace std::chrono;
545 return high_resolution_clock::now().time_since_epoch() - epoch_;
546 }
TimeFloor() const547 TimeSpan NodeJSTaskQueue::TimeFloor() const
548 {
549 using namespace std::chrono;
550 return TimeSpan::Microseconds(floor<microseconds>(Now()).count());
551 }
TimeCeil() const552 TimeSpan NodeJSTaskQueue::TimeCeil() const
553 {
554 using namespace std::chrono;
555 return TimeSpan::Microseconds(ceil<microseconds>(Now()).count());
556 }
557
GetOrCreateNodeTaskQueue(napi_env e)558 META_NS::ITaskQueue::Ptr GetOrCreateNodeTaskQueue(napi_env e)
559 {
560 auto& tr = GetTaskQueueRegistry();
561 if (const auto alreadyRegistered = tr.GetTaskQueue(JS_THREAD_DEP)) {
562 return alreadyRegistered;
563 }
564
565 auto& obr = GetObjectRegistry();
566 obr.RegisterObjectType<NodeJSTaskQueue>();
567
568 auto params = obr.Create<IMetadata>(META_NS::ClassId::Object);
569 if (!params) {
570 return {};
571 }
572 auto p = ConstructProperty<uintptr_t>("env", uintptr_t(e), ObjectFlagBits::INTERNAL | ObjectFlagBits::NATIVE);
573 if (!p) {
574 return {};
575 }
576 params->AddProperty(p);
577 const auto result = obr.Create<ITaskQueue>(::ClassId::NodeJSTaskQueue, params);
578 if (result) {
579 tr.RegisterTaskQueue(result, JS_THREAD_DEP);
580 }
581 return result;
582 }
583
DeinitNodeTaskQueue()584 bool DeinitNodeTaskQueue()
585 {
586 auto& tr = GetTaskQueueRegistry();
587 auto tq = tr.GetTaskQueue(JS_THREAD_DEP);
588 if (!tq) {
589 // already deinitialized
590 return true;
591 }
592 if (!tq->GetInterface<INodeJSTaskQueue>()->IsReleased()) {
593 // Unsafe deinitialize
594 return false;
595 }
596 // can be safely unregistered.
597 // expect the instance to be destroyed now.
598 tr.UnregisterTaskQueue(JS_THREAD_DEP);
599 GetTaskQueueRegistry().SetCurrentTaskQueue({});
600 return true;
601 }
602