1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef META_SRC_TASK_QUEUE_H
17 #define META_SRC_TASK_QUEUE_H
18
19 #include <deque>
20 #include <mutex>
21 #include <thread>
22
23 #include <base/containers/vector.h>
24
25 #include <meta/interface/intf_clock.h>
26 #include <meta/interface/intf_task_queue.h>
27
META_BEGIN_NAMESPACE()28 META_BEGIN_NAMESPACE()
29
30 class TaskQueueImpl : public ITaskQueueExtend {
31 public:
32 using Token = ITaskQueue::Token;
33
34 void SetExtend(ITaskQueueExtend* extend) override
35 {
36 extend_ = extend ? extend : this;
37 }
38 void Shutdown() override
39 {
40 Close();
41 }
42
43 void CancelTask(Token token)
44 {
45 if (token != nullptr) {
46 std::unique_lock lock { mutex_ };
47 Token executingToken = execToken_;
48 if (token == execToken_) {
49 // Currently executing task is requested to cancel.
50 // Tasks are temporarily removed from the queue while execution, so the currently running task is not in
51 // the queue anymore. Setting execToken_ to null will cause the task to not be re-added.
52 execToken_ = nullptr;
53 }
54
55 // If we are currently executing the task in different thread, wait for it to complete.
56 if (std::this_thread::get_id() != execThread_) {
57 while (!terminate_ && token == executingToken) {
58 lock.unlock();
59 // sleep a bit.
60 std::this_thread::yield();
61 lock.lock();
62 executingToken = execToken_;
63 }
64 }
65
66 // Remove all tasks from the queue, with the same token. (if any)
67 // One can push the same task to the queue multiple times currently.
68 // (ie. you "can" schedule the same task with different "delays")
69 // So we remove all scheduled tasks with same token.
70 // Also redo/rearm might have add the task back while we were waiting/yielding.
71 for (auto it = tasks_.begin(); it != tasks_.end();) {
72 if (it->operation.get() == token) {
73 it = tasks_.erase(it);
74 } else {
75 it++;
76 }
77 }
78 }
79 }
80
81 Token AddTaskImpl(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
82 {
83 Token ret { p.get() };
84
85 if (auto i = interface_cast<ITaskScheduleInfo>(p)) {
86 i->SetQueueAndToken(self_.lock(), ret);
87 }
88
89 // insertion sort the tasks
90 if (tasks_.empty()) {
91 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
92 } else if (tasks_.size() == 1) {
93 if (tasks_.front().executeTime >= excTime) {
94 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
95 } else {
96 tasks_.insert(tasks_.begin(), { delay, excTime, BASE_NS::move(p) });
97 }
98 } else {
99 bool found = false;
100 for (auto it = tasks_.begin(); it != tasks_.end(); ++it) {
101 if (it->executeTime <= excTime) {
102 // task in list should execute after us, so insert there.
103 tasks_.insert(it, { delay, excTime, BASE_NS::move(p) });
104 found = true;
105 break;
106 }
107 }
108 if (!found) {
109 // add last then ..
110 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
111 }
112 }
113 return ret;
114 }
115
116 Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
117 {
118 if (p) {
119 std::unique_lock lock { mutex_ };
120 return AddTaskImpl(BASE_NS::move(p), delay, excTime);
121 }
122 return nullptr;
123 }
124
125 TimeSpan Time() const
126 {
127 using namespace std::chrono;
128 return TimeSpan::Microseconds(
129 duration_cast<microseconds>(high_resolution_clock::now().time_since_epoch()).count());
130 }
131
132 void ProcessTasks(std::unique_lock<std::mutex>& lock, TimeSpan curTime)
133 {
134 // Must only be called while having the lock
135 BASE_NS::vector<Task> rearm;
136 while (!terminate_ && !tasks_.empty() && curTime >= tasks_.back().executeTime) {
137 auto task = BASE_NS::move(tasks_.back());
138 tasks_.pop_back();
139 execToken_ = task.operation.get();
140 lock.unlock();
141 bool redo = extend_->InvokeTask(task.operation);
142 lock.lock();
143 // Note execToken_ has been set to null if the executing task is cancelled.
144 if ((redo) && (execToken_ != nullptr)) {
145 // Reschedule the task again.
146 rearm.emplace_back(BASE_NS::move(task));
147 }
148 execToken_ = nullptr;
149 }
150
151 // rearm the tasks.. (if we are not shutting down)
152 if (!terminate_) {
153 for (auto it = rearm.rbegin(); it != rearm.rend(); ++it) {
154 auto& task = *it;
155 if (task.delay > TimeSpan()) {
156 // calculate the next executeTime in phase.. (ie. how many events missed)
157 uint64_t dt = task.delay.ToMicroseconds();
158 uint64_t et = task.executeTime.ToMicroseconds();
159 uint64_t ct = curTime.ToMicroseconds();
160 // calculate the next executeTime in phase..
161 et += dt;
162 if (et <= ct) {
163 // "ticks" how many events would have ran.. (rounded up)
164 auto ticks = ((ct - et) + (dt - 1)) / dt;
165 // and based on the "ticks" we can now count the next execution time.
166 et += (ticks * dt);
167 CORE_LOG_V("Skipped ticks %d", (int)ticks);
168 }
169 task.executeTime = TimeSpan::Microseconds(et);
170 } else {
171 task.executeTime = curTime;
172 }
173 AddTaskImpl(task.operation, task.delay, task.executeTime);
174 }
175 }
176 }
177
178 void Close()
179 {
180 std::unique_lock lock { mutex_ };
181 terminate_ = true;
182 tasks_.clear();
183 }
184
185 struct Task {
186 Task() = default;
187 Task(TimeSpan d, TimeSpan e, const ITaskQueueTask::Ptr& p) : delay(d), executeTime(e), operation(p) {}
188
189 TimeSpan delay;
190 TimeSpan executeTime;
191 ITaskQueueTask::Ptr operation { nullptr };
192 };
193
194 protected:
195 std::mutex mutex_;
196
197 ITaskQueueExtend* extend_ { this };
198 bool terminate_ {};
199 std::thread::id execThread_;
200 // currently running task..
201 Token execToken_ { nullptr };
202 std::deque<Task> tasks_;
203 ITaskQueue::WeakPtr self_;
204 };
205
206 META_END_NAMESPACE()
207
208 #endif
209