• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef META_SRC_TASK_QUEUE_H
17 #define META_SRC_TASK_QUEUE_H
18 
19 #include <deque>
20 #include <mutex>
21 #include <thread>
22 
23 #include <base/containers/vector.h>
24 
25 #include <meta/interface/intf_clock.h>
26 #include <meta/interface/intf_task_queue.h>
27 
META_BEGIN_NAMESPACE()28 META_BEGIN_NAMESPACE()
29 
30 class TaskQueueImpl : public ITaskQueueExtend {
31 public:
32     using Token = ITaskQueue::Token;
33 
34     void SetExtend(ITaskQueueExtend* extend) override
35     {
36         extend_ = extend ? extend : this;
37     }
38     void Shutdown() override
39     {
40         Close();
41     }
42 
43     void CancelTask(Token token)
44     {
45         if (token != nullptr) {
46             std::unique_lock lock { mutex_ };
47             Token executingToken = execToken_;
48             if (token == execToken_) {
49                 // Currently executing task is requested to cancel.
50                 // Tasks are temporarily removed from the queue while execution, so the currently running task is not in
51                 // the queue anymore. Setting execToken_ to null will cause the task to not be re-added.
52                 execToken_ = nullptr;
53             }
54 
55             // If we are currently executing the task in different thread, wait for it to complete.
56             if (std::this_thread::get_id() != execThread_) {
57                 while (!terminate_ && token == executingToken) {
58                     lock.unlock();
59                     // sleep a bit.
60                     std::this_thread::yield();
61                     lock.lock();
62                     executingToken = execToken_;
63                 }
64             }
65 
66             // Remove all tasks from the queue, with the same token. (if any)
67             // One can push the same task to the queue multiple times currently.
68             // (ie. you "can" schedule the same task with different "delays")
69             // So we remove all scheduled tasks with same token.
70             // Also redo/rearm might have add the task back while we were waiting/yielding.
71             for (auto it = tasks_.begin(); it != tasks_.end();) {
72                 if (it->operation.get() == token) {
73                     it = tasks_.erase(it);
74                 } else {
75                     it++;
76                 }
77             }
78         }
79     }
80 
81     Token AddTaskImpl(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
82     {
83         Token ret { p.get() };
84 
85         if (auto i = interface_cast<ITaskScheduleInfo>(p)) {
86             i->SetQueueAndToken(self_.lock(), ret);
87         }
88 
89         // insertion sort the tasks
90         if (tasks_.empty()) {
91             tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
92         } else if (tasks_.size() == 1) {
93             if (tasks_.front().executeTime >= excTime) {
94                 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
95             } else {
96                 tasks_.insert(tasks_.begin(), { delay, excTime, BASE_NS::move(p) });
97             }
98         } else {
99             bool found = false;
100             for (auto it = tasks_.begin(); it != tasks_.end(); ++it) {
101                 if (it->executeTime <= excTime) {
102                     // task in list should execute after us, so insert there.
103                     tasks_.insert(it, { delay, excTime, BASE_NS::move(p) });
104                     found = true;
105                     break;
106                 }
107             }
108             if (!found) {
109                 // add last then ..
110                 tasks_.emplace_back(delay, excTime, BASE_NS::move(p));
111             }
112         }
113         return ret;
114     }
115 
116     Token AddTask(ITaskQueueTask::Ptr p, const TimeSpan& delay, const TimeSpan& excTime)
117     {
118         if (p) {
119             std::unique_lock lock { mutex_ };
120             return AddTaskImpl(BASE_NS::move(p), delay, excTime);
121         }
122         return nullptr;
123     }
124 
125     TimeSpan Time() const
126     {
127         using namespace std::chrono;
128         return TimeSpan::Microseconds(
129             duration_cast<microseconds>(high_resolution_clock::now().time_since_epoch()).count());
130     }
131 
132     void ProcessTasks(std::unique_lock<std::mutex>& lock, TimeSpan curTime)
133     {
134         // Must only be called while having the lock
135         BASE_NS::vector<Task> rearm;
136         while (!terminate_ && !tasks_.empty() && curTime >= tasks_.back().executeTime) {
137             auto task = BASE_NS::move(tasks_.back());
138             tasks_.pop_back();
139             execToken_ = task.operation.get();
140             lock.unlock();
141             bool redo = extend_->InvokeTask(task.operation);
142             lock.lock();
143             // Note execToken_ has been set to null if the executing task is cancelled.
144             if ((redo) && (execToken_ != nullptr)) {
145                 // Reschedule the task again.
146                 rearm.emplace_back(BASE_NS::move(task));
147             }
148             execToken_ = nullptr;
149         }
150 
151         // rearm the tasks.. (if we are not shutting down)
152         if (!terminate_) {
153             for (auto it = rearm.rbegin(); it != rearm.rend(); ++it) {
154                 auto& task = *it;
155                 if (task.delay > TimeSpan()) {
156                     // calculate the next executeTime in phase.. (ie. how many events missed)
157                     uint64_t dt = task.delay.ToMicroseconds();
158                     uint64_t et = task.executeTime.ToMicroseconds();
159                     uint64_t ct = curTime.ToMicroseconds();
160                     // calculate the next executeTime in phase..
161                     et += dt;
162                     if (et <= ct) {
163                         // "ticks" how many events would have ran.. (rounded up)
164                         auto ticks = ((ct - et) + (dt - 1)) / dt;
165                         // and based on the "ticks" we can now count the next execution time.
166                         et += (ticks * dt);
167                         CORE_LOG_V("Skipped ticks %d", (int)ticks);
168                     }
169                     task.executeTime = TimeSpan::Microseconds(et);
170                 } else {
171                     task.executeTime = curTime;
172                 }
173                 AddTaskImpl(task.operation, task.delay, task.executeTime);
174             }
175         }
176     }
177 
178     void Close()
179     {
180         std::unique_lock lock { mutex_ };
181         terminate_ = true;
182         tasks_.clear();
183     }
184 
185     struct Task {
186         Task() = default;
187         Task(TimeSpan d, TimeSpan e, const ITaskQueueTask::Ptr& p) : delay(d), executeTime(e), operation(p) {}
188 
189         TimeSpan delay;
190         TimeSpan executeTime;
191         ITaskQueueTask::Ptr operation { nullptr };
192     };
193 
194 protected:
195     std::mutex mutex_;
196 
197     ITaskQueueExtend* extend_ { this };
198     bool terminate_ {};
199     std::thread::id execThread_;
200     // currently running task..
201     Token execToken_ { nullptr };
202     std::deque<Task> tasks_;
203     ITaskQueue::WeakPtr self_;
204 };
205 
206 META_END_NAMESPACE()
207 
208 #endif
209