• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "nodejstaskqueue.h"
17 
18 #include <chrono>
19 #include <deque>
20 #include <meta/ext/object.h>
21 #include <meta/ext/task_queue.h>
22 #include <meta/interface/intf_task_queue.h>
23 #include <meta/interface/intf_task_queue_registry.h>
24 #include <mutex>
25 #include <thread>
26 
27 using namespace META_NS;
28 using Token = ITaskQueue::Token;
29 
30 class NodeJSTaskQueue : public IntroduceInterfaces<MetaObject, ITaskQueue, INodeJSTaskQueue> {
31     META_OBJECT(NodeJSTaskQueue, ::ClassId::NodeJSTaskQueue, IntroduceInterfaces)
32 public:
33     META_NO_COPY_MOVE(NodeJSTaskQueue)
34 
35     NodeJSTaskQueue();
36     ~NodeJSTaskQueue() override;
37 
38 private:
39     // ITaskQueue
40     Token AddTask(ITaskQueueTask::Ptr p) override;
41     Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay) override;
42     IFuture::Ptr AddWaitableTask(ITaskQueueWaitableTask::Ptr p) override;
43     void CancelTask(Token token) override;
44 
45     // INodeJSTaskQueue
46     napi_env GetNapiEnv() const override;
47     bool Acquire() override;
48     bool Release() override;
49     bool IsReleased() override;
50 
51     // ILifecycle
52     bool Build(const IMetadata::Ptr& data) override;
53 
54 private:
55     static napi_value Run(napi_env env, napi_callback_info info);
56     static void Cleanup(napi_env env, void* finalize_data, void* context);
57     static void Invoke(napi_env env, napi_value js_callback, void* context, void* data);
58 
59     void SetTimeout(napi_env env, uint32_t trigger);
60     void CancelTimeout(napi_env env);
61 
62     void Schedule(napi_env env, std::unique_lock<std::recursive_mutex>& lock);
63     bool RescheduleTimer();
64 
65     bool IsInQueue();
66 
67     Token MakeToken(const ITaskQueueTask::Ptr p);
68     void AddTaskImpl(Token ret, ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime);
69     Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime);
70 
71     std::chrono::high_resolution_clock::duration Now() const;
72     TimeSpan TimeFloor() const;
73     TimeSpan TimeCeil() const;
74 
75     struct Task {
76         Task() = default;
TaskNodeJSTaskQueue::Task77         Task(Token t, TimeSpan d, TimeSpan e, const ITaskQueueTask::Ptr& p)
78             : token(t), delay(d), executeTime(e), operation(p)
79         {}
80 
81         Token token { 0 };
82         TimeSpan delay;
83         TimeSpan executeTime;
84         ITaskQueueTask::Ptr operation { nullptr };
85     };
86 
87     std::chrono::high_resolution_clock::duration epoch_ { 0 };
88     napi_env env_ { nullptr };
89     napi_threadsafe_function tsf_ { nullptr };
90     NapiApi::StrongRef curTimeout_;
91     NapiApi::StrongRef RunFunc_;
92 
93     std::recursive_mutex mutex_;
94     std::thread::id execThread_;
95     // currently running task..
96     Token execToken_ { nullptr };
97     std::deque<Task> tasks_;
98     bool scheduled_ = false;
99     uint32_t refcnt_ { 0 };
100 };
101 
NodeJSTaskQueue()102 NodeJSTaskQueue::NodeJSTaskQueue() {}
103 
~NodeJSTaskQueue()104 NodeJSTaskQueue::~NodeJSTaskQueue()
105 {
106     assert(tasks_.empty());
107     assert(refcnt_ == 0);
108     assert(curTimeout_.IsEmpty());
109     assert(RunFunc_.IsEmpty());
110     assert(tsf_ == nullptr);
111     assert(!scheduled_);
112 }
113 
Acquire()114 bool NodeJSTaskQueue::Acquire()
115 {
116     std::unique_lock lock { mutex_ };
117     assert(execThread_ == std::this_thread::get_id());
118     if (execThread_ != std::this_thread::get_id()) {
119         // called from incorrect thread. do nothing.
120         return false;
121     }
122     if (refcnt_ == 0) {
123         if (tsf_ == nullptr) {
124             napi_value func;
125             napi_create_function(env_, "NodeJsTaskQueueRun", 0, Run, this, &func);
126             RunFunc_ = { env_, func };
127 
128             napi_value name;
129             napi_create_string_latin1(env_, "NodeJSTaskQueue", 1, &name);
130             napi_create_threadsafe_function(env_, nullptr, nullptr, name, 0, 1, nullptr, Cleanup, this, Invoke, &tsf_);
131         }
132     }
133     refcnt_++;
134     return true;
135 }
136 
Release()137 bool NodeJSTaskQueue::Release()
138 {
139     std::unique_lock lock { mutex_ };
140     assert(execThread_ == std::this_thread::get_id());
141     if (execThread_ != std::this_thread::get_id()) {
142         // called from incorrect thread. do nothing.
143         return false;
144     }
145     if (refcnt_ == 0) {
146         // already released.
147         return false;
148     }
149     if (refcnt_ == 1) {
150         refcnt_ = 0;
151         // fully released, we may be allowed to release the resources
152         if (tasks_.empty()) {
153             // queue empty, we can finalize.
154             auto tsf = tsf_;
155             tsf_ = nullptr;
156             RunFunc_.Reset();
157             curTimeout_.Reset();
158             lock.unlock();
159             napi_release_threadsafe_function(tsf, napi_threadsafe_function_release_mode::napi_tsfn_abort);
160         }
161         return true;
162     }
163     refcnt_--;
164     return true;
165 }
166 
IsReleased()167 bool NodeJSTaskQueue::IsReleased()
168 {
169     std::unique_lock lock { mutex_ };
170     assert(execThread_ == std::this_thread::get_id());
171     if (execThread_ != std::this_thread::get_id()) {
172         // called from incorrect thread.
173         return false;
174     }
175     if (refcnt_ > 0) {
176         return false;
177     }
178     if (!tasks_.empty()) {
179         return false;
180     }
181     if (tsf_) {
182         return false;
183     }
184     if (scheduled_) {
185         // still busy..
186         return false;
187     }
188     return true;
189 }
190 
Build(const IMetadata::Ptr & data)191 bool NodeJSTaskQueue::Build(const IMetadata::Ptr& data)
192 {
193     if (!Super::Build(data)) {
194         return false;
195     }
196     if (auto prp = data->GetProperty<uintptr_t>("env")) {
197         env_ = (napi_env)prp->GetValue();
198     } else {
199         return false;
200     }
201     execThread_ = std::this_thread::get_id();
202     epoch_ = std::chrono::high_resolution_clock::now().time_since_epoch();
203 
204     // make the NodeJSTaskQueue current for this thread.
205     // setting it here, makes sure that GetCurrentTaskQueue could be used to identify the thread.
206     // unless someone runs other taskqueues in js thread.. hmm
207     GetTaskQueueRegistry().SetCurrentTaskQueue(GetSelf<ITaskQueue>());
208 
209     return true;
210 }
211 
GetNapiEnv() const212 napi_env NodeJSTaskQueue::GetNapiEnv() const
213 {
214     return env_;
215 }
Run(napi_env env,napi_callback_info info)216 napi_value NodeJSTaskQueue::Run(napi_env env, napi_callback_info info)
217 {
218     NapiApi::FunctionContext fc(env, info);
219     auto me = static_cast<NodeJSTaskQueue*>(fc.GetData());
220     std::unique_lock lock { me->mutex_ };
221     me->curTimeout_.Reset();
222     me->Schedule(env, lock);
223     return {};
224 }
SetTimeout(napi_env env,uint32_t trigger)225 void NodeJSTaskQueue::SetTimeout(napi_env env, uint32_t trigger)
226 {
227     NapiApi::Env e(env);
228     napi_value global;
229     napi_get_global(env, &global);
230     NapiApi::Object g(e, global);
231     NapiApi::Function func = g.Get<NapiApi::Function>("setTimeout");
232 
233     napi_value args[2];
234     args[0] = RunFunc_.GetValue();
235     args[1] = e.GetNumber(trigger);
236     napi_value res { nullptr };
237     res = func.Invoke(g, 2, args); // 2: arg num
238     curTimeout_ = { e, res };
239 }
CancelTimeout(napi_env env)240 void NodeJSTaskQueue::CancelTimeout(napi_env env)
241 {
242     if (curTimeout_.IsEmpty()) {
243         return;
244     }
245     // cancel previous timeout
246     NapiApi::Env e(env);
247     napi_value global;
248     napi_get_global(env, &global);
249     NapiApi::Object g(e, global);
250     NapiApi::Function func = g.Get<NapiApi::Function>("clearTimeout");
251 
252     napi_value args[1];
253     args[0] = curTimeout_.GetValue();
254     func.Invoke(g, 1, args);
255     curTimeout_.Reset();
256 }
Schedule(napi_env env,std::unique_lock<std::recursive_mutex> & lock)257 void NodeJSTaskQueue::Schedule(napi_env env, std::unique_lock<std::recursive_mutex>& lock)
258 {
259     // NOTE: lock MUST be in locked state when entering here.
260     assert(lock.owns_lock());
261 
262     // running in javascript side.
263     BASE_NS::vector<Task> rearm;
264     auto curTime = TimeFloor(); // round down the time..
265     // 1. see how long until the first scheduled task. (looping through the queue ,executing tasks if needed)
266     for (;;) {
267         if (tasks_.empty()) {
268             // no tasks to execute.
269             break;
270         }
271         auto front = tasks_.back();
272         // 2. if task should have executed already, invoke it.
273         if (curTime >= front.executeTime) {
274             // execute
275             tasks_.pop_back();
276             execToken_ = front.token;
277             lock.unlock();
278             auto q = GetTaskQueueRegistry().SetCurrentTaskQueue(GetSelf<ITaskQueue>());
279             auto ret = front.operation->Invoke();
280             GetTaskQueueRegistry().SetCurrentTaskQueue(q);
281             if (!ret) {
282                 // if the task is to be terminated, do it here.
283                 front = {};
284             }
285             lock.lock();
286             if (ret && execToken_) {
287                 rearm.emplace_back(front.token, front.delay, front.executeTime, BASE_NS::move(front.operation));
288             }
289             execToken_ = { 0 };
290             continue;
291         }
292         break;
293     }
294     // cancel timeout if there is one
295     CancelTimeout(env);
296 
297     if (!rearm.empty()) {
298         // just add them..
299         for (auto t : rearm) {
300             // calculate the next executeTime in phase.. (ie. how many events missed)
301             const auto dt = t.delay;
302             const auto ct = curTime;
303             if (dt > TimeSpan::Zero()) {
304                 auto et = t.executeTime;
305                 // calculate the next executeTime in phase..
306                 auto ticks = ((ct - et) + dt) / dt;
307                 // and based on the "ticks" we can now count the next execution time.
308                 et += (ticks * dt);
309                 AddTaskImpl(t.token, t.operation, dt, et);
310             } else {
311                 // re-scheduling zero delay ones..
312                 // so zero delay, and now.
313                 AddTaskImpl(t.token, t.operation, TimeSpan::Zero(), ct);
314             }
315         }
316     }
317 
318     if (!tasks_.empty()) {
319         // 3. schedule a timer for remaining tasks. (using the front most time)
320         const auto& front = tasks_.back();
321         int64_t delayMS = (front.executeTime - curTime).ToMilliseconds();
322         // clamp to at most 10 seconds of wait and at minimum 0 milliseconds.
323         uint32_t trigger = BASE_NS::Math::max(int64_t(0), BASE_NS::Math::min(int64_t(10000), delayMS));
324         SetTimeout(env, trigger);
325     }
326 
327     if (tasks_.empty()) {
328         // queue empty, we may be allowed to release resourcec.
329         if (refcnt_ == 0) {
330             if (tsf_) {
331                 // release the TSF, we have given permission to terminate.
332                 auto tsf = tsf_;
333                 tsf_ = nullptr;
334                 napi_release_threadsafe_function(tsf, napi_threadsafe_function_release_mode::napi_tsfn_abort);
335             }
336             RunFunc_.Reset();
337         }
338     }
339 }
340 
RescheduleTimer()341 bool NodeJSTaskQueue::RescheduleTimer()
342 {
343     std::unique_lock lock { mutex_ };
344     if (IsInQueue()) {
345         // call directly as we are in JS thread.
346         Schedule(env_, lock);
347         return true;
348     }
349     if (!tsf_) {
350         // tsf does not exist anymore, so stop scheduling.
351         return false;
352     }
353     if (!scheduled_) {
354         // TSF not queued yet.
355         // use tsf to schedule
356         auto res = napi_acquire_threadsafe_function(tsf_);
357         if (res != napi_ok) {
358             // could not acquire the function anymore.
359             // (most likely cleanup already in progress)
360             // fail.
361             return false;
362         }
363         void* data = nullptr;
364         scheduled_ = true;
365         res = napi_call_threadsafe_function(tsf_, data, napi_threadsafe_function_call_mode::napi_tsfn_blocking);
366         if (res == napi_closing) {
367             // clean up in progress.
368             return false;
369         }
370         res = napi_release_threadsafe_function(tsf_, napi_threadsafe_function_release_mode::napi_tsfn_release);
371         if (res != napi_ok) {
372             return false;
373         }
374     }
375     return true;
376 }
377 
Cleanup(napi_env env,void * finalize_data,void * context)378 void NodeJSTaskQueue::Cleanup(napi_env env, void* finalize_data, void* context)
379 {
380     auto* me = static_cast<NodeJSTaskQueue*>(context);
381     // we actually do not need to do anything here now.
382 }
Invoke(napi_env env,napi_value js_callback,void * context,void * data)383 void NodeJSTaskQueue::Invoke(napi_env env, napi_value js_callback, void* context, void* data)
384 {
385     auto* me = static_cast<NodeJSTaskQueue*>(context);
386     if (me) {
387         std::unique_lock lock { me->mutex_ };
388         me->scheduled_ = false;
389         me->Schedule(env, lock);
390     }
391 }
IsInQueue()392 bool NodeJSTaskQueue::IsInQueue()
393 {
394     auto tmp = GetTaskQueueRegistry().GetCurrentTaskQueue();
395     if (!tmp) {
396         return false;
397     }
398     // this is the correct way to compare random objects.
399     auto i1 = tmp->GetInterface(CORE_NS::IInterface::UID);
400     auto i2 = GetInterface(CORE_NS::IInterface::UID);
401     return (i1 == i2);
402 }
CancelTask(Token token)403 void NodeJSTaskQueue::CancelTask(Token token)
404 {
405     if (!token) {
406         // invalid parameter.
407         return;
408     }
409     bool sameThread = false;
410     auto curThread = std::this_thread::get_id();
411     if (IsInQueue()) {
412         // currently executing tasks in this thread.
413         sameThread = true;
414     }
415     if (curThread != execThread_) {
416         sameThread = true;
417     }
418     std::unique_lock lock { mutex_ };
419     Token executingToken = execToken_;
420     if (token == execToken_) {
421         // Currently executing task is requested to cancel.
422         // Tasks are temporarily removed from the queue while execution, so the currently running task is not in
423         // the queue anymore. Setting execToken_ to null will cause the task to not be re-added.
424         execToken_ = nullptr;
425     }
426 
427     // If we are currently executing the task in different thread, wait for it to complete.
428     // ie. called OUTSIDE the queue.
429     if (!sameThread) {
430         while (token == executingToken) {
431             lock.unlock();
432             // sleep a bit.
433             std::this_thread::yield();
434             lock.lock();
435             executingToken = execToken_;
436         }
437     }
438 
439     // Remove all tasks from the queue, with the same token. (if any)
440     // One can push the same task to the queue multiple times currently.
441     // (ie. you "can" schedule the same task with different "delays")
442     // So we remove all scheduled tasks with same token.
443     // Also redo/rearm might have add the task back while we were waiting/yielding.
444     for (auto it = tasks_.begin(); it != tasks_.end();) {
445         if (it->token == token) {
446             it = tasks_.erase(it);
447         } else {
448             it++;
449         }
450     }
451     // trigger js side to schedule the tasks!
452     RescheduleTimer();
453 }
454 
AddTaskImpl(Token ret,ITaskQueueTask::Ptr p,const TimeSpan & delay,const TimeSpan & excTime)455 void NodeJSTaskQueue::AddTaskImpl(Token ret, ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
456 {
457     if (auto i = interface_cast<ITaskScheduleInfo>(p)) {
458         i->SetQueueAndToken(GetSelf<ITaskQueue>(), ret);
459     }
460 
461     // insertion sort the tasks
462     //
463     // early out trivial cases (empty list, insert first, insert last..)
464     if (tasks_.empty()) {
465         tasks_.emplace_back(ret, delay, excTime, BASE_NS::move(p));
466     } else if (excTime <= tasks_.back().executeTime) {
467         // new task should execute before the current first one.
468         tasks_.emplace_back(ret, delay, excTime, BASE_NS::move(p));
469     } else if (excTime > tasks_.front().executeTime) {
470         // new task should execute after the current last one.
471         tasks_.emplace_front(ret, delay, excTime, BASE_NS::move(p));
472     } else {
473         // finally resort to linear search..
474         // (could use a smarter search but expect that there are not too many tasks in queue)
475         for (auto it = tasks_.begin(); it != tasks_.end(); ++it) {
476             if (it->executeTime <= excTime) {
477                 // task in list should execute after us, so insert there.
478                 tasks_.insert(it, { ret, delay, excTime, BASE_NS::move(p) });
479                 break;
480             }
481         }
482     }
483 }
MakeToken(const ITaskQueueTask::Ptr p)484 Token NodeJSTaskQueue::MakeToken(const ITaskQueueTask::Ptr p)
485 {
486     // use the old cheap "task raw pointer as token" method.
487     return p.get();
488 }
AddTask(ITaskQueueTask::Ptr p,const TimeSpan & delay,const TimeSpan & excTime)489 Token NodeJSTaskQueue::AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
490 {
491     if (!p) {
492         return nullptr;
493     }
494     std::unique_lock lock { mutex_ };
495     // unique
496     Token result = MakeToken(p);
497     AddTaskImpl(result, BASE_NS::move(p), delay, excTime);
498 
499     // trigger js side to schedule the tasks!
500     if (!RescheduleTimer()) {
501         // can not schedule tasks anymore.
502         // remove the task we TRIED to add..
503         for (auto it = tasks_.begin(); it != tasks_.end(); it++) {
504             if ((it->token = result) && (it->executeTime == excTime) && (it->delay == delay)) {
505                 tasks_.erase(it);
506                 break;
507             }
508         }
509         result = { nullptr };
510     }
511     return result;
512 }
513 
AddTask(ITaskQueueTask::Ptr p)514 Token NodeJSTaskQueue::AddTask(ITaskQueueTask::Ptr p)
515 {
516     return AddTask(BASE_NS::move(p), TimeSpan::Milliseconds(0));
517 }
518 
AddTask(ITaskQueueTask::Ptr p,const TimeSpan & delay)519 Token NodeJSTaskQueue::AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay)
520 {
521     return AddTask(BASE_NS::move(p), delay, TimeCeil() + delay);
522 }
523 
AddWaitableTask(ITaskQueueWaitableTask::Ptr p)524 IFuture::Ptr NodeJSTaskQueue::AddWaitableTask(ITaskQueueWaitableTask::Ptr p)
525 {
526     if (IsInQueue()) {
527         // warning! this may cause a deadlock.
528         LOG_W("Adding a waitable task to the same thread! Wait may never complete!");
529     }
530 
531     auto promise = GetObjectRegistry().Create<IPromise>(META_NS::ClassId::Promise);
532     BASE_NS::shared_ptr<PromisedQueueTask> task(new PromisedQueueTask(p, move(promise)));
533     auto f = task->GetFuture();
534     if (!AddTask(BASE_NS::move(task))) {
535         // could not schedule the task.
536         // so the promise is abandoned.
537         promise->SetAbandoned();
538     }
539     return f;
540 }
541 
Now() const542 std::chrono::high_resolution_clock::duration NodeJSTaskQueue::Now() const
543 {
544     using namespace std::chrono;
545     return high_resolution_clock::now().time_since_epoch() - epoch_;
546 }
TimeFloor() const547 TimeSpan NodeJSTaskQueue::TimeFloor() const
548 {
549     using namespace std::chrono;
550     return TimeSpan::Microseconds(floor<microseconds>(Now()).count());
551 }
TimeCeil() const552 TimeSpan NodeJSTaskQueue::TimeCeil() const
553 {
554     using namespace std::chrono;
555     return TimeSpan::Microseconds(ceil<microseconds>(Now()).count());
556 }
557 
GetOrCreateNodeTaskQueue(napi_env e)558 META_NS::ITaskQueue::Ptr GetOrCreateNodeTaskQueue(napi_env e)
559 {
560     auto& tr = GetTaskQueueRegistry();
561     if (const auto alreadyRegistered = tr.GetTaskQueue(JS_THREAD_DEP)) {
562         return alreadyRegistered;
563     }
564 
565     auto& obr = GetObjectRegistry();
566     obr.RegisterObjectType<NodeJSTaskQueue>();
567 
568     auto params = obr.Create<IMetadata>(META_NS::ClassId::Object);
569     if (!params) {
570         return {};
571     }
572     auto p = ConstructProperty<uintptr_t>("env", uintptr_t(e), ObjectFlagBits::INTERNAL | ObjectFlagBits::NATIVE);
573     if (!p) {
574         return {};
575     }
576     params->AddProperty(p);
577     const auto result = obr.Create<ITaskQueue>(::ClassId::NodeJSTaskQueue, params);
578     if (result) {
579         tr.RegisterTaskQueue(result, JS_THREAD_DEP);
580     }
581     return result;
582 }
583 
DeinitNodeTaskQueue()584 bool DeinitNodeTaskQueue()
585 {
586     auto& tr = GetTaskQueueRegistry();
587     auto tq = tr.GetTaskQueue(JS_THREAD_DEP);
588     if (!tq) {
589         // already deinitialized
590         return true;
591     }
592     if (!tq->GetInterface<INodeJSTaskQueue>()->IsReleased()) {
593         // Unsafe deinitialize
594         return false;
595     }
596     // can be safely unregistered.
597     // expect the instance to be destroyed now.
598     tr.UnregisterTaskQueue(JS_THREAD_DEP);
599     GetTaskQueueRegistry().SetCurrentTaskQueue({});
600     return true;
601 }
602