• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef META_SRC_TASK_QUEUE_H
17 #define META_SRC_TASK_QUEUE_H
18 
19 #include <chrono>
20 #include <deque>
21 #include <mutex>
22 #include <thread>
23 
24 #include <base/containers/vector.h>
25 
26 #include <meta/interface/intf_clock.h>
27 #include <meta/interface/intf_task_queue.h>
28 
META_BEGIN_NAMESPACE()29 META_BEGIN_NAMESPACE()
30 
31 class TaskQueueImpl : public ITaskQueueExtend {
32 public:
33     using Token = ITaskQueue::Token;
34 
35     void SetExtend(ITaskQueueExtend* extend) override
36     {
37         extend_ = extend ? extend : this;
38     }
39     void Shutdown() override
40     {
41         Close();
42     }
43 
44     void CancelTask(Token token)
45     {
46         if (token != nullptr) {
47             std::unique_lock lock { mutex_ };
48             Token executingToken = execToken_;
49             if (token == execToken_) {
50                 // Currently executing task is requested to cancel.
51                 // Tasks are temporarily removed from the queue while execution, so the currently running task is not in
52                 // the queue anymore. Setting execToken_ to null will cause the task to not be re-added.
53                 execToken_ = nullptr;
54             }
55 
56             // If we are currently executing the task in different thread, wait for it to complete.
57             if (std::this_thread::get_id() != execThread_) {
58                 while (!terminate_ && token == executingToken) {
59                     lock.unlock();
60                     // sleep a bit.
61                     std::this_thread::yield();
62                     lock.lock();
63                     executingToken = execToken_;
64                 }
65             }
66 
67             // Remove all tasks from the queue, with the same token. (if any)
68             // One can push the same task to the queue multiple times currently.
69             // (ie. you "can" schedule the same task with different "delays")
70             // So we remove all scheduled tasks with same token.
71             // Also redo/rearm might have add the task back while we were waiting/yielding.
72             for (auto it = tasks_.begin(); it != tasks_.end();) {
73                 if (it->operation.get() == token) {
74                     it = tasks_.erase(it);
75                 } else {
76                     it++;
77                 }
78             }
79         }
80     }
81 
82     Token AddTaskImpl(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
83     {
84         Token ret { p.get() };
85 
86         if (auto i = interface_cast<ITaskScheduleInfo>(p)) {
87             i->SetQueueAndToken(self_.lock(), ret);
88         }
89 
90         // insertion sort the tasks
91         if (tasks_.empty()) {
92             tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
93         } else if (tasks_.size() == 1) {
94             if (tasks_.front().executeTime >= excTime) {
95                 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
96             } else {
97                 tasks_.insert(tasks_.begin(), { delay, excTime, BASE_NS::move(p) });
98             }
99         } else {
100             bool found = false;
101             for (auto it = tasks_.begin(); it != tasks_.end(); ++it) {
102                 if (it->executeTime <= excTime) {
103                     // task in list should execute after us, so insert there.
104                     tasks_.insert(it, { delay, excTime, BASE_NS::move(p) });
105                     found = true;
106                     break;
107                 }
108             }
109             if (!found) {
110                 // add last then ..
111                 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
112             }
113         }
114         return ret;
115     }
116 
117     Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
118     {
119         if (p) {
120             std::unique_lock lock { mutex_ };
121             return AddTaskImpl(BASE_NS::move(p), delay, excTime);
122         }
123         return nullptr;
124     }
125 
126     TimeSpan Time() const
127     {
128         using namespace std::chrono;
129         return TimeSpan::Microseconds(
130             duration_cast<microseconds>(high_resolution_clock::now().time_since_epoch()).count());
131     }
132 
133     void ProcessTasks(std::unique_lock<std::mutex>& lock, TimeSpan curTime)
134     {
135         // Must only be called while having the lock
136         BASE_NS::vector<Task> rearm;
137         while (!terminate_ && !tasks_.empty() && curTime >= tasks_.back().executeTime) {
138             auto task = BASE_NS::move(tasks_.back());
139             tasks_.pop_back();
140             execToken_ = task.operation.get();
141             lock.unlock();
142             bool redo = extend_->InvokeTask(task.operation);
143             lock.lock();
144             // Note execToken_ has been set to null if the executing task is cancelled.
145             if ((redo) && (execToken_ != nullptr)) {
146                 // Reschedule the task again.
147                 rearm.emplace_back(BASE_NS::move(task));
148             }
149             execToken_ = nullptr;
150         }
151 
152         // rearm the tasks.. (if we are not shutting down)
153         if (!terminate_) {
154             for (auto it = rearm.rbegin(); it != rearm.rend(); ++it) {
155                 auto& task = *it;
156                 if (task.delay > TimeSpan()) {
157                     // calculate the next executeTime in phase.. (ie. how many events missed)
158                     uint64_t dt = task.delay.ToMicroseconds();
159                     uint64_t et = task.executeTime.ToMicroseconds();
160                     uint64_t ct = curTime.ToMicroseconds();
161                     // calculate the next executeTime in phase..
162                     et += dt;
163                     if (et <= ct) {
164                         // "ticks" how many events would have ran.. (rounded up)
165                         auto ticks = ((ct - et) + (dt - 1)) / dt;
166                         // and based on the "ticks" we can now count the next execution time.
167                         et += (ticks * dt);
168                         CORE_LOG_V("Skipped ticks %d", (int)ticks);
169                     }
170                     task.executeTime = TimeSpan::Microseconds(et);
171                 } else {
172                     task.executeTime = curTime;
173                 }
174                 AddTaskImpl(task.operation, task.delay, task.executeTime);
175             }
176         }
177     }
178 
179     void Close()
180     {
181         std::unique_lock lock { mutex_ };
182         terminate_ = true;
183         tasks_.clear();
184     }
185 
186     struct Task {
187         Task() = default;
188         Task(TimeSpan d, TimeSpan e, const ITaskQueueTask::Ptr& p) : delay(d), executeTime(e), operation(p) {}
189 
190         TimeSpan delay;
191         TimeSpan executeTime;
192         ITaskQueueTask::Ptr operation { nullptr };
193     };
194 
195 protected:
196     std::mutex mutex_;
197 
198     ITaskQueueExtend* extend_ { this };
199     bool terminate_ {};
200     std::thread::id execThread_;
201     // currently running task..
202     Token execToken_ { nullptr };
203     std::deque<Task> tasks_;
204     ITaskQueue::WeakPtr self_;
205 };
206 
207 META_END_NAMESPACE()
208 
209 #endif
210