1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef META_SRC_TASK_QUEUE_H
17 #define META_SRC_TASK_QUEUE_H
18
19 #include <chrono>
20 #include <deque>
21 #include <mutex>
22 #include <thread>
23
24 #include <base/containers/vector.h>
25
26 #include <meta/interface/intf_clock.h>
27 #include <meta/interface/intf_task_queue.h>
28
META_BEGIN_NAMESPACE()29 META_BEGIN_NAMESPACE()
30
31 class TaskQueueImpl : public ITaskQueueExtend {
32 public:
33 using Token = ITaskQueue::Token;
34
35 void SetExtend(ITaskQueueExtend* extend) override
36 {
37 extend_ = extend ? extend : this;
38 }
39 void Shutdown() override
40 {
41 Close();
42 }
43
44 void CancelTask(Token token)
45 {
46 if (token != nullptr) {
47 std::unique_lock lock { mutex_ };
48 Token executingToken = execToken_;
49 if (token == execToken_) {
50 // Currently executing task is requested to cancel.
51 // Tasks are temporarily removed from the queue while execution, so the currently running task is not in
52 // the queue anymore. Setting execToken_ to null will cause the task to not be re-added.
53 execToken_ = nullptr;
54 }
55
56 // If we are currently executing the task in different thread, wait for it to complete.
57 if (std::this_thread::get_id() != execThread_) {
58 while (!terminate_ && token == executingToken) {
59 lock.unlock();
60 // sleep a bit.
61 std::this_thread::yield();
62 lock.lock();
63 executingToken = execToken_;
64 }
65 }
66
67 // Remove all tasks from the queue, with the same token. (if any)
68 // One can push the same task to the queue multiple times currently.
69 // (ie. you "can" schedule the same task with different "delays")
70 // So we remove all scheduled tasks with same token.
71 // Also redo/rearm might have add the task back while we were waiting/yielding.
72 for (auto it = tasks_.begin(); it != tasks_.end();) {
73 if (it->operation.get() == token) {
74 it = tasks_.erase(it);
75 } else {
76 it++;
77 }
78 }
79 }
80 }
81
82 Token AddTaskImpl(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
83 {
84 Token ret { p.get() };
85
86 if (auto i = interface_cast<ITaskScheduleInfo>(p)) {
87 i->SetQueueAndToken(self_.lock(), ret);
88 }
89
90 // insertion sort the tasks
91 if (tasks_.empty()) {
92 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
93 } else if (tasks_.size() == 1) {
94 if (tasks_.front().executeTime >= excTime) {
95 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
96 } else {
97 tasks_.insert(tasks_.begin(), { delay, excTime, BASE_NS::move(p) });
98 }
99 } else {
100 bool found = false;
101 for (auto it = tasks_.begin(); it != tasks_.end(); ++it) {
102 if (it->executeTime <= excTime) {
103 // task in list should execute after us, so insert there.
104 tasks_.insert(it, { delay, excTime, BASE_NS::move(p) });
105 found = true;
106 break;
107 }
108 }
109 if (!found) {
110 // add last then ..
111 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
112 }
113 }
114 return ret;
115 }
116
117 Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
118 {
119 if (p) {
120 std::unique_lock lock { mutex_ };
121 return AddTaskImpl(BASE_NS::move(p), delay, excTime);
122 }
123 return nullptr;
124 }
125
126 TimeSpan Time() const
127 {
128 using namespace std::chrono;
129 return TimeSpan::Microseconds(
130 duration_cast<microseconds>(high_resolution_clock::now().time_since_epoch()).count());
131 }
132
133 void ProcessTasks(std::unique_lock<std::mutex>& lock, TimeSpan curTime)
134 {
135 // Must only be called while having the lock
136 BASE_NS::vector<Task> rearm;
137 while (!terminate_ && !tasks_.empty() && curTime >= tasks_.back().executeTime) {
138 auto task = BASE_NS::move(tasks_.back());
139 tasks_.pop_back();
140 execToken_ = task.operation.get();
141 lock.unlock();
142 bool redo = extend_->InvokeTask(task.operation);
143 lock.lock();
144 // Note execToken_ has been set to null if the executing task is cancelled.
145 if ((redo) && (execToken_ != nullptr)) {
146 // Reschedule the task again.
147 rearm.emplace_back(BASE_NS::move(task));
148 }
149 execToken_ = nullptr;
150 }
151
152 // rearm the tasks.. (if we are not shutting down)
153 if (!terminate_) {
154 for (auto it = rearm.rbegin(); it != rearm.rend(); ++it) {
155 auto& task = *it;
156 if (task.delay > TimeSpan()) {
157 // calculate the next executeTime in phase.. (ie. how many events missed)
158 uint64_t dt = task.delay.ToMicroseconds();
159 uint64_t et = task.executeTime.ToMicroseconds();
160 uint64_t ct = curTime.ToMicroseconds();
161 // calculate the next executeTime in phase..
162 et += dt;
163 if (et <= ct) {
164 // "ticks" how many events would have ran.. (rounded up)
165 auto ticks = ((ct - et) + (dt - 1)) / dt;
166 // and based on the "ticks" we can now count the next execution time.
167 et += (ticks * dt);
168 CORE_LOG_V("Skipped ticks %d", (int)ticks);
169 }
170 task.executeTime = TimeSpan::Microseconds(et);
171 } else {
172 task.executeTime = curTime;
173 }
174 AddTaskImpl(task.operation, task.delay, task.executeTime);
175 }
176 }
177 }
178
179 void Close()
180 {
181 std::unique_lock lock { mutex_ };
182 terminate_ = true;
183 tasks_.clear();
184 }
185
186 struct Task {
187 Task() = default;
188 Task(TimeSpan d, TimeSpan e, const ITaskQueueTask::Ptr& p) : delay(d), executeTime(e), operation(p) {}
189
190 TimeSpan delay;
191 TimeSpan executeTime;
192 ITaskQueueTask::Ptr operation { nullptr };
193 };
194
195 protected:
196 std::mutex mutex_;
197
198 ITaskQueueExtend* extend_ { this };
199 bool terminate_ {};
200 std::thread::id execThread_;
201 // currently running task..
202 Token execToken_ { nullptr };
203 std::deque<Task> tasks_;
204 ITaskQueue::WeakPtr self_;
205 };
206
207 META_END_NAMESPACE()
208
209 #endif
210