1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <condition_variable>
17 #include <mutex>
18 #include <thread>
19
20 #include <base/containers/vector.h>
21
22 #include <meta/base/interface_macros.h>
23 #include <meta/base/namespace.h>
24 #include <meta/interface/intf_task_queue.h>
25 #include <meta/interface/intf_task_queue_registry.h>
26
27 #include "future.h"
28 #include "object.h"
29 #include "task_queue.h"
30
31 META_BEGIN_NAMESPACE()
32
33 class ThreadedTaskQueue : public IntroduceInterfaces<MetaObject, IThreadedTaskQueue, TaskQueueImpl> {
34 META_OBJECT(ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IntroduceInterfaces)
35 public:
36 using Token = ITaskQueue::Token;
37
38 META_NO_COPY_MOVE(ThreadedTaskQueue)
39
40 ThreadedTaskQueue() = default;
~ThreadedTaskQueue()41 ~ThreadedTaskQueue() override
42 {
43 Shutdown();
44 }
45
Build(const IMetadata::Ptr & data)46 bool Build(const IMetadata::Ptr& data) override
47 {
48 bool ret = Super::Build(data);
49 if (ret) {
50 self_ = GetSelf<ITaskQueue>();
51 thread_ = std::thread([this]() { ProcessTasks(); });
52 }
53 return ret;
54 }
55
InvokeTask(const ITaskQueueTask::Ptr & task)56 bool InvokeTask(const ITaskQueueTask::Ptr& task) override
57 {
58 if (task) {
59 auto q = GetTaskQueueRegistry().SetCurrentTaskQueue(self_);
60 auto ret = task->Invoke();
61 GetTaskQueueRegistry().SetCurrentTaskQueue(q);
62 return ret;
63 }
64 return false;
65 }
66
Shutdown()67 void Shutdown() override
68 {
69 Close();
70 addCondition_.notify_one();
71 if (thread_.joinable()) {
72 thread_.join();
73 }
74 }
75
CancelTask(Token token)76 void CancelTask(Token token) override
77 {
78 TaskQueueImpl::CancelTask(token);
79 }
80
AddTask(ITaskQueueTask::Ptr p)81 Token AddTask(ITaskQueueTask::Ptr p) override
82 {
83 return AddTask(BASE_NS::move(p), TimeSpan::Milliseconds(0));
84 }
85
AddTask(ITaskQueueTask::Ptr p,const TimeSpan & delay)86 Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay) override
87 {
88 auto t = TaskQueueImpl::AddTask(BASE_NS::move(p), delay, Time() + delay);
89 if (t) {
90 addCondition_.notify_one();
91 }
92 return t;
93 }
94
AddWaitableTask(ITaskQueueWaitableTask::Ptr p)95 IFuture::Ptr AddWaitableTask(ITaskQueueWaitableTask::Ptr p) override
96 {
97 IPromise::Ptr promise(new Promise);
98 BASE_NS::shared_ptr<PromisedQueueTask> task(new PromisedQueueTask(BASE_NS::move(p), promise));
99 auto f = task->GetFuture();
100 AddTask(BASE_NS::move(task));
101 return f;
102 }
103
ProcessTasks()104 void ProcessTasks()
105 {
106 std::unique_lock lock { mutex_ };
107 execThread_ = std::this_thread::get_id();
108 while (!terminate_) {
109 if (!tasks_.empty()) {
110 TimeSpan delta = tasks_.back().executeTime - Time();
111 // wait for next execute time (or trigger which ever is first). and see if we can now process things..
112 // technically we will always be a bit late here. "it's a best effort"
113 if (delta > TimeSpan::Microseconds(0)) {
114 addCondition_.wait_for(lock, std::chrono::microseconds(delta.ToMicroseconds()));
115 }
116 } else {
117 // infinite wait, since the queue is empty..
118 addCondition_.wait(lock);
119 }
120 auto curTime = Time();
121 TaskQueueImpl::ProcessTasks(lock, curTime);
122 }
123 }
124
125 private:
126 std::condition_variable addCondition_;
127 std::thread thread_;
128 };
129
130 namespace Internal {
131
GetThreadedTaskQueueFactory()132 IObjectFactory::Ptr GetThreadedTaskQueueFactory()
133 {
134 return ThreadedTaskQueue::GetFactory();
135 }
136
137 } // namespace Internal
138
139 META_END_NAMESPACE()
140