1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "libpandabase/taskmanager/task_scheduler.h"
17 #include "libpandabase/utils/logger.h"
18 #include "libpandabase/taskmanager/task_statistics/fine_grained_task_statistics_impl.h"
19 #include "libpandabase/taskmanager/task_statistics/simple_task_statistics_impl.h"
20 #include "libpandabase/taskmanager/task_statistics/lock_free_task_statistics_impl.h"
21
22 namespace panda::taskmanager {
23
24 TaskScheduler *TaskScheduler::instance_ = nullptr;
25
TaskScheduler(size_t workersCount,TaskStatisticsImplType taskStatisticsType)26 TaskScheduler::TaskScheduler(size_t workersCount, TaskStatisticsImplType taskStatisticsType)
27 : workersCount_(workersCount), gen_(std::random_device()())
28 {
29 switch (taskStatisticsType) {
30 case TaskStatisticsImplType::FINE_GRAINED:
31 taskStatistics_ = new FineGrainedTaskStatisticsImpl();
32 break;
33 case TaskStatisticsImplType::SIMPLE:
34 taskStatistics_ = new SimpleTaskStatisticsImpl();
35 break;
36 case TaskStatisticsImplType::LOCK_FREE:
37 taskStatistics_ = new LockFreeTaskStatisticsImpl();
38 break;
39 default:
40 UNREACHABLE();
41 }
42 }
43
44 /* static */
Create(size_t threadsCount,TaskStatisticsImplType taskStatisticsType)45 TaskScheduler *TaskScheduler::Create(size_t threadsCount, TaskStatisticsImplType taskStatisticsType)
46 {
47 ASSERT(instance_ == nullptr);
48 ASSERT(threadsCount > 0);
49 instance_ = new TaskScheduler(threadsCount, taskStatisticsType);
50 return instance_;
51 }
52
53 /* static */
GetTaskScheduler()54 TaskScheduler *TaskScheduler::GetTaskScheduler()
55 {
56 return instance_;
57 }
58
59 /* static */
Destroy()60 void TaskScheduler::Destroy()
61 {
62 ASSERT(instance_ != nullptr);
63 delete instance_;
64 instance_ = nullptr;
65 }
66
RegisterQueue(internal::SchedulableTaskQueueInterface * queue)67 TaskQueueId TaskScheduler::RegisterQueue(internal::SchedulableTaskQueueInterface *queue)
68 {
69 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
70 ASSERT(!start_);
71 TaskQueueId id(queue->GetTaskType(), queue->GetVMType());
72 if (taskQueues_.find(id) != taskQueues_.end()) {
73 return INVALID_TASKQUEUE_ID;
74 }
75 taskQueues_[id] = queue;
76 queue->SetNewTasksCallback([this](TaskProperties properties, size_t count, bool wasEmpty) {
77 this->IncrementCounterOfAddedTasks(properties, count, wasEmpty);
78 });
79 return id;
80 }
81
Initialize()82 void TaskScheduler::Initialize()
83 {
84 ASSERT(!start_);
85 start_ = true;
86 LOG(DEBUG, RUNTIME) << "TaskScheduler: creates " << workersCount_ << " threads";
87 for (size_t i = 0; i < workersCount_; i++) {
88 workers_.push_back(new WorkerThread(
89 [this](const TaskPropertiesCounterMap &counterMap) { this->IncrementCounterOfExecutedTasks(counterMap); }));
90 }
91 }
92
FillWithTasks(WorkerThread * worker,size_t tasksCount)93 bool TaskScheduler::FillWithTasks(WorkerThread *worker, size_t tasksCount)
94 {
95 ASSERT(start_);
96 os::memory::LockHolder workerLockHolder(workersLock_);
97 LOG(DEBUG, RUNTIME) << "TaskScheduler: FillWithTasks";
98 {
99 os::memory::LockHolder taskSchedulerLockHolder(taskSchedulerStateLock_);
100 while (AreQueuesEmpty()) {
101 /// We exit in situation when finish_ = true .TM Finalize, Worker wake up and go finish WorkerLoop.
102 if (finish_) {
103 LOG(DEBUG, RUNTIME) << "TaskScheduler: FillWithTasks: return queue without issues";
104 return true;
105 }
106 queuesWaitCondVar_.Wait(&taskSchedulerStateLock_);
107 }
108 }
109 os::memory::LockHolder popLockHolder(popFromTaskQueuesLock_);
110 SelectNextTasks(tasksCount);
111 PutTasksInWorker(worker);
112 LOG(DEBUG, RUNTIME) << "TaskScheduler: FillWithTasks: return queue with tasks";
113 return false;
114 }
115
SelectNextTasks(size_t tasksCount)116 void TaskScheduler::SelectNextTasks(size_t tasksCount)
117 {
118 LOG(DEBUG, RUNTIME) << "TaskScheduler: SelectNextTasks()";
119 if (AreQueuesEmpty()) {
120 return;
121 }
122 UpdateKineticPriorities();
123 // Now kinetic_priorities_
124 size_t kineticMax = 0;
125 std::tie(kineticMax, std::ignore) = *kineticPriorities_.rbegin(); // Get key of the last element in map
126 std::uniform_int_distribution<size_t> distribution(0U, kineticMax - 1U);
127
128 for (size_t i = 0; i < tasksCount; i++) {
129 size_t choice = distribution(gen_); // Get random number in range [0, kinetic_max)
130 internal::SchedulableTaskQueueInterface *queue = nullptr;
131 std::tie(std::ignore, queue) = *kineticPriorities_.upper_bound(choice); // Get queue of chosen element
132
133 TaskQueueId id(queue->GetTaskType(), queue->GetVMType());
134 selectedQueues_[id]++;
135 }
136 }
137
UpdateKineticPriorities()138 void TaskScheduler::UpdateKineticPriorities()
139 {
140 ASSERT(!taskQueues_.empty()); // no TaskQueues
141 size_t kineticSum = 0;
142 internal::SchedulableTaskQueueInterface *queue = nullptr;
143 for (const auto &idQueuePair : taskQueues_) {
144 std::tie(std::ignore, queue) = idQueuePair;
145 if (queue->IsEmpty()) {
146 continue;
147 }
148 kineticSum += queue->GetPriority();
149 kineticPriorities_[kineticSum] = queue;
150 }
151 }
152
PutTasksInWorker(WorkerThread * worker)153 size_t TaskScheduler::PutTasksInWorker(WorkerThread *worker)
154 {
155 size_t taskCount = 0;
156 for (auto [id, size] : selectedQueues_) {
157 if (size == 0) {
158 continue;
159 }
160 auto addTaskFunc = [worker](Task &&task) { worker->AddTask(std::move(task)); };
161 size_t queueTaskCount = taskQueues_[id]->PopTasksToWorker(addTaskFunc, size);
162 taskCount += queueTaskCount;
163 LOG(DEBUG, RUNTIME) << "PutTasksInWorker: worker have gotten " << queueTaskCount << " tasks";
164 }
165 selectedQueues_.clear();
166 return taskCount;
167 }
168
AreQueuesEmpty() const169 bool TaskScheduler::AreQueuesEmpty() const
170 {
171 internal::SchedulableTaskQueueInterface *queue = nullptr;
172 for (const auto &traitsQueuePair : taskQueues_) {
173 std::tie(std::ignore, queue) = traitsQueuePair;
174 if (!queue->IsEmpty()) {
175 return false;
176 }
177 }
178 return true;
179 }
180
AreNoMoreTasks() const181 bool TaskScheduler::AreNoMoreTasks() const
182 {
183 return taskStatistics_->GetCountOfTaskInSystem() == 0;
184 }
185
GetTaskFromQueue(TaskProperties properties)186 std::optional<Task> TaskScheduler::GetTaskFromQueue(TaskProperties properties)
187 {
188 LOG(DEBUG, RUNTIME) << "TaskScheduler: GetTaskFromQueue()";
189 os::memory::LockHolder popLockHolder(popFromTaskQueuesLock_);
190 internal::SchedulableTaskQueueInterface *queue = nullptr;
191 {
192 os::memory::LockHolder taskManagerLockHolder(taskSchedulerStateLock_);
193 auto taskQueuesIterator = taskQueues_.find({properties.GetTaskType(), properties.GetVMType()});
194 if (taskQueuesIterator == taskQueues_.end()) {
195 if (finish_) {
196 return std::nullopt;
197 }
198 LOG(FATAL, COMMON) << "Attempt to take a task from a non-existent queue";
199 }
200 std::tie(std::ignore, queue) = *taskQueuesIterator;
201 }
202 if (!queue->HasTaskWithExecutionMode(properties.GetTaskExecutionMode())) {
203 return std::nullopt;
204 }
205 // Now we can pop the task from the chosen queue
206 auto task = queue->PopTask();
207 // Only after popping we can notify task statistics that task was POPPED from queue to get it outside. This sequence
208 // ensures that the number of tasks in the system (see TaskStatistics) will be correct.
209 taskStatistics_->IncrementCount(TaskStatus::POPPED, properties, 1);
210 if (taskStatistics_->GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
211 os::memory::LockHolder taskManagerLockHolder(taskSchedulerStateLock_);
212 finishTasksCondVar_.SignalAll();
213 }
214 return task;
215 }
216
WaitForFinishAllTasksWithProperties(TaskProperties properties)217 void TaskScheduler::WaitForFinishAllTasksWithProperties(TaskProperties properties)
218 {
219 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
220 while (taskStatistics_->GetCountOfTasksInSystemWithTaskProperties(properties) != 0) {
221 finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
222 }
223 LOG(DEBUG, RUNTIME) << "After waiting tasks with properties: " << properties
224 << " {added: " << taskStatistics_->GetCount(TaskStatus::ADDED, properties)
225 << ", executed: " << taskStatistics_->GetCount(TaskStatus::EXECUTED, properties)
226 << ", popped: " << taskStatistics_->GetCount(TaskStatus::POPPED, properties) << "}";
227 taskStatistics_->ResetCountersWithTaskProperties(properties);
228 }
229
Finalize()230 void TaskScheduler::Finalize()
231 {
232 ASSERT(start_);
233 {
234 // Wait all tasks will be done
235 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
236 while (!AreNoMoreTasks()) {
237 finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
238 }
239 finish_ = true;
240 queuesWaitCondVar_.Signal();
241 }
242 for (auto worker : workers_) {
243 worker->Join();
244 delete worker;
245 }
246 taskStatistics_->ResetAllCounters();
247 LOG(DEBUG, RUNTIME) << "TaskScheduler: Finalized";
248 }
249
IncrementCounterOfAddedTasks(TaskProperties properties,size_t ivalue,bool wasEmpty)250 void TaskScheduler::IncrementCounterOfAddedTasks(TaskProperties properties, size_t ivalue, bool wasEmpty)
251 {
252 taskStatistics_->IncrementCount(TaskStatus::ADDED, properties, ivalue);
253 if (wasEmpty) {
254 os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
255 queuesWaitCondVar_.Signal();
256 }
257 }
258
IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap & counterMap)259 void TaskScheduler::IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap &counterMap)
260 {
261 for (const auto &[properties, count] : counterMap) {
262 taskStatistics_->IncrementCount(TaskStatus::EXECUTED, properties, count);
263 if (taskStatistics_->GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
264 os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
265 finishTasksCondVar_.SignalAll();
266 }
267 }
268 }
269
~TaskScheduler()270 TaskScheduler::~TaskScheduler()
271 {
272 // We can delete TaskScheduler if it wasn't started or it was finished
273 ASSERT(start_ == finish_);
274 // Check if all task queue was deleted
275 ASSERT(taskQueues_.empty());
276 delete taskStatistics_;
277 LOG(DEBUG, RUNTIME) << "TaskScheduler: ~TaskScheduler: All threads finished jobs";
278 }
279
280 } // namespace panda::taskmanager
281