• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <condition_variable>
17 #include <mutex>
18 #include <thread>
19 
20 #include <base/containers/vector.h>
21 
22 #include <meta/base/interface_macros.h>
23 #include <meta/base/namespace.h>
24 #include <meta/interface/intf_task_queue.h>
25 #include <meta/interface/intf_task_queue_registry.h>
26 
27 #include "future.h"
28 #include "object.h"
29 #include "task_queue.h"
30 
31 META_BEGIN_NAMESPACE()
32 
33 class ThreadedTaskQueue : public IntroduceInterfaces<MetaObject, IThreadedTaskQueue, TaskQueueImpl> {
34     META_OBJECT(ThreadedTaskQueue, ClassId::ThreadedTaskQueue, IntroduceInterfaces)
35 public:
36     using Token = ITaskQueue::Token;
37 
38     META_NO_COPY_MOVE(ThreadedTaskQueue)
39 
40     ThreadedTaskQueue() = default;
~ThreadedTaskQueue()41     ~ThreadedTaskQueue() override
42     {
43         Shutdown();
44     }
45 
Build(const IMetadata::Ptr & data)46     bool Build(const IMetadata::Ptr& data) override
47     {
48         bool ret = Super::Build(data);
49         if (ret) {
50             self_ = GetSelf<ITaskQueue>();
51             thread_ = std::thread([this]() { ProcessTasks(); });
52         }
53         return ret;
54     }
55 
InvokeTask(const ITaskQueueTask::Ptr & task)56     bool InvokeTask(const ITaskQueueTask::Ptr& task) override
57     {
58         if (task) {
59         auto q = GetTaskQueueRegistry().SetCurrentTaskQueue(self_);
60         auto ret = task->Invoke();
61         GetTaskQueueRegistry().SetCurrentTaskQueue(q);
62         return ret;
63         }
64         return false;
65     }
66 
Shutdown()67     void Shutdown() override
68     {
69         Close();
70         addCondition_.notify_one();
71         if (thread_.joinable()) {
72             thread_.join();
73         }
74     }
75 
CancelTask(Token token)76     void CancelTask(Token token) override
77     {
78         TaskQueueImpl::CancelTask(token);
79     }
80 
AddTask(ITaskQueueTask::Ptr p)81     Token AddTask(ITaskQueueTask::Ptr p) override
82     {
83         return AddTask(BASE_NS::move(p), TimeSpan::Milliseconds(0));
84     }
85 
AddTask(ITaskQueueTask::Ptr p,const TimeSpan & delay)86     Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay) override
87     {
88         auto t = TaskQueueImpl::AddTask(BASE_NS::move(p), delay, Time() + delay);
89         if (t) {
90             addCondition_.notify_one();
91         }
92         return t;
93     }
94 
AddWaitableTask(ITaskQueueWaitableTask::Ptr p)95     IFuture::Ptr AddWaitableTask(ITaskQueueWaitableTask::Ptr p) override
96     {
97         IPromise::Ptr promise(new Promise);
98         BASE_NS::shared_ptr<PromisedQueueTask> task(new PromisedQueueTask(BASE_NS::move(p), promise));
99         auto f = task->GetFuture();
100         AddTask(BASE_NS::move(task));
101         return f;
102     }
103 
ProcessTasks()104     void ProcessTasks()
105     {
106         std::unique_lock lock { mutex_ };
107         execThread_ = std::this_thread::get_id();
108         while (!terminate_) {
109             if (!tasks_.empty()) {
110                 TimeSpan delta = tasks_.back().executeTime - Time();
111                 // wait for next execute time (or trigger which ever is first). and see if we can now process things..
112                 // technically we will always be a bit late here. "it's a best effort"
113                 if (delta > TimeSpan::Microseconds(0)) {
114                     addCondition_.wait_for(lock, std::chrono::microseconds(delta.ToMicroseconds()));
115                 }
116             } else {
117                 // infinite wait, since the queue is empty..
118                 addCondition_.wait(lock);
119             }
120             auto curTime = Time();
121             TaskQueueImpl::ProcessTasks(lock, curTime);
122         }
123     }
124 
125 private:
126     std::condition_variable addCondition_;
127     std::thread thread_;
128 };
129 
130 namespace Internal {
131 
GetThreadedTaskQueueFactory()132 IObjectFactory::Ptr GetThreadedTaskQueueFactory()
133 {
134     return ThreadedTaskQueue::GetFactory();
135 }
136 
137 } // namespace Internal
138 
139 META_END_NAMESPACE()
140