• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "libpandabase/taskmanager/task_scheduler.h"
17 #include "libpandabase/utils/logger.h"
18 #include "libpandabase/taskmanager/task_statistics/fine_grained_task_statistics_impl.h"
19 #include "libpandabase/taskmanager/task_statistics/simple_task_statistics_impl.h"
20 #include "libpandabase/taskmanager/task_statistics/lock_free_task_statistics_impl.h"
21 
22 namespace panda::taskmanager {
23 
24 TaskScheduler *TaskScheduler::instance_ = nullptr;
25 
TaskScheduler(size_t workersCount,TaskStatisticsImplType taskStatisticsType)26 TaskScheduler::TaskScheduler(size_t workersCount, TaskStatisticsImplType taskStatisticsType)
27     : workersCount_(workersCount), gen_(std::random_device()())
28 {
29     switch (taskStatisticsType) {
30         case TaskStatisticsImplType::FINE_GRAINED:
31             taskStatistics_ = new FineGrainedTaskStatisticsImpl();
32             break;
33         case TaskStatisticsImplType::SIMPLE:
34             taskStatistics_ = new SimpleTaskStatisticsImpl();
35             break;
36         case TaskStatisticsImplType::LOCK_FREE:
37             taskStatistics_ = new LockFreeTaskStatisticsImpl();
38             break;
39         default:
40             UNREACHABLE();
41     }
42 }
43 
44 /* static */
Create(size_t threadsCount,TaskStatisticsImplType taskStatisticsType)45 TaskScheduler *TaskScheduler::Create(size_t threadsCount, TaskStatisticsImplType taskStatisticsType)
46 {
47     ASSERT(instance_ == nullptr);
48     ASSERT(threadsCount > 0);
49     instance_ = new TaskScheduler(threadsCount, taskStatisticsType);
50     return instance_;
51 }
52 
53 /* static */
GetTaskScheduler()54 TaskScheduler *TaskScheduler::GetTaskScheduler()
55 {
56     return instance_;
57 }
58 
59 /* static */
Destroy()60 void TaskScheduler::Destroy()
61 {
62     ASSERT(instance_ != nullptr);
63     delete instance_;
64     instance_ = nullptr;
65 }
66 
RegisterQueue(internal::SchedulableTaskQueueInterface * queue)67 TaskQueueId TaskScheduler::RegisterQueue(internal::SchedulableTaskQueueInterface *queue)
68 {
69     os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
70     ASSERT(!start_);
71     TaskQueueId id(queue->GetTaskType(), queue->GetVMType());
72     if (taskQueues_.find(id) != taskQueues_.end()) {
73         return INVALID_TASKQUEUE_ID;
74     }
75     taskQueues_[id] = queue;
76     queue->SetNewTasksCallback([this](TaskProperties properties, size_t count, bool wasEmpty) {
77         this->IncrementCounterOfAddedTasks(properties, count, wasEmpty);
78     });
79     return id;
80 }
81 
Initialize()82 void TaskScheduler::Initialize()
83 {
84     ASSERT(!start_);
85     start_ = true;
86     LOG(DEBUG, RUNTIME) << "TaskScheduler: creates " << workersCount_ << " threads";
87     for (size_t i = 0; i < workersCount_; i++) {
88         workers_.push_back(new WorkerThread(
89             [this](const TaskPropertiesCounterMap &counterMap) { this->IncrementCounterOfExecutedTasks(counterMap); }));
90     }
91 }
92 
FillWithTasks(WorkerThread * worker,size_t tasksCount)93 bool TaskScheduler::FillWithTasks(WorkerThread *worker, size_t tasksCount)
94 {
95     ASSERT(start_);
96     os::memory::LockHolder workerLockHolder(workersLock_);
97     LOG(DEBUG, RUNTIME) << "TaskScheduler: FillWithTasks";
98     {
99         os::memory::LockHolder taskSchedulerLockHolder(taskSchedulerStateLock_);
100         while (AreQueuesEmpty()) {
101             /// We exit in situation when finish_ = true .TM Finalize, Worker wake up and go finish WorkerLoop.
102             if (finish_) {
103                 LOG(DEBUG, RUNTIME) << "TaskScheduler: FillWithTasks: return queue without issues";
104                 return true;
105             }
106             queuesWaitCondVar_.Wait(&taskSchedulerStateLock_);
107         }
108     }
109     os::memory::LockHolder popLockHolder(popFromTaskQueuesLock_);
110     SelectNextTasks(tasksCount);
111     PutTasksInWorker(worker);
112     LOG(DEBUG, RUNTIME) << "TaskScheduler: FillWithTasks: return queue with tasks";
113     return false;
114 }
115 
SelectNextTasks(size_t tasksCount)116 void TaskScheduler::SelectNextTasks(size_t tasksCount)
117 {
118     LOG(DEBUG, RUNTIME) << "TaskScheduler: SelectNextTasks()";
119     if (AreQueuesEmpty()) {
120         return;
121     }
122     UpdateKineticPriorities();
123     // Now kinetic_priorities_
124     size_t kineticMax = 0;
125     std::tie(kineticMax, std::ignore) = *kineticPriorities_.rbegin();  // Get key of the last element in map
126     std::uniform_int_distribution<size_t> distribution(0U, kineticMax - 1U);
127 
128     for (size_t i = 0; i < tasksCount; i++) {
129         size_t choice = distribution(gen_);  // Get random number in range [0, kinetic_max)
130         internal::SchedulableTaskQueueInterface *queue = nullptr;
131         std::tie(std::ignore, queue) = *kineticPriorities_.upper_bound(choice);  // Get queue of chosen element
132 
133         TaskQueueId id(queue->GetTaskType(), queue->GetVMType());
134         selectedQueues_[id]++;
135     }
136 }
137 
UpdateKineticPriorities()138 void TaskScheduler::UpdateKineticPriorities()
139 {
140     ASSERT(!taskQueues_.empty());  // no TaskQueues
141     size_t kineticSum = 0;
142     internal::SchedulableTaskQueueInterface *queue = nullptr;
143     for (const auto &idQueuePair : taskQueues_) {
144         std::tie(std::ignore, queue) = idQueuePair;
145         if (queue->IsEmpty()) {
146             continue;
147         }
148         kineticSum += queue->GetPriority();
149         kineticPriorities_[kineticSum] = queue;
150     }
151 }
152 
PutTasksInWorker(WorkerThread * worker)153 size_t TaskScheduler::PutTasksInWorker(WorkerThread *worker)
154 {
155     size_t taskCount = 0;
156     for (auto [id, size] : selectedQueues_) {
157         if (size == 0) {
158             continue;
159         }
160         auto addTaskFunc = [worker](Task &&task) { worker->AddTask(std::move(task)); };
161         size_t queueTaskCount = taskQueues_[id]->PopTasksToWorker(addTaskFunc, size);
162         taskCount += queueTaskCount;
163         LOG(DEBUG, RUNTIME) << "PutTasksInWorker: worker have gotten " << queueTaskCount << " tasks";
164     }
165     selectedQueues_.clear();
166     return taskCount;
167 }
168 
AreQueuesEmpty() const169 bool TaskScheduler::AreQueuesEmpty() const
170 {
171     internal::SchedulableTaskQueueInterface *queue = nullptr;
172     for (const auto &traitsQueuePair : taskQueues_) {
173         std::tie(std::ignore, queue) = traitsQueuePair;
174         if (!queue->IsEmpty()) {
175             return false;
176         }
177     }
178     return true;
179 }
180 
AreNoMoreTasks() const181 bool TaskScheduler::AreNoMoreTasks() const
182 {
183     return taskStatistics_->GetCountOfTaskInSystem() == 0;
184 }
185 
GetTaskFromQueue(TaskProperties properties)186 std::optional<Task> TaskScheduler::GetTaskFromQueue(TaskProperties properties)
187 {
188     LOG(DEBUG, RUNTIME) << "TaskScheduler: GetTaskFromQueue()";
189     os::memory::LockHolder popLockHolder(popFromTaskQueuesLock_);
190     internal::SchedulableTaskQueueInterface *queue = nullptr;
191     {
192         os::memory::LockHolder taskManagerLockHolder(taskSchedulerStateLock_);
193         auto taskQueuesIterator = taskQueues_.find({properties.GetTaskType(), properties.GetVMType()});
194         if (taskQueuesIterator == taskQueues_.end()) {
195             if (finish_) {
196                 return std::nullopt;
197             }
198             LOG(FATAL, COMMON) << "Attempt to take a task from a non-existent queue";
199         }
200         std::tie(std::ignore, queue) = *taskQueuesIterator;
201     }
202     if (!queue->HasTaskWithExecutionMode(properties.GetTaskExecutionMode())) {
203         return std::nullopt;
204     }
205     // Now we can pop the task from the chosen queue
206     auto task = queue->PopTask();
207     // Only after popping we can notify task statistics that task was POPPED from queue to get it outside. This sequence
208     // ensures that the number of tasks in the system (see TaskStatistics) will be correct.
209     taskStatistics_->IncrementCount(TaskStatus::POPPED, properties, 1);
210     if (taskStatistics_->GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
211         os::memory::LockHolder taskManagerLockHolder(taskSchedulerStateLock_);
212         finishTasksCondVar_.SignalAll();
213     }
214     return task;
215 }
216 
WaitForFinishAllTasksWithProperties(TaskProperties properties)217 void TaskScheduler::WaitForFinishAllTasksWithProperties(TaskProperties properties)
218 {
219     os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
220     while (taskStatistics_->GetCountOfTasksInSystemWithTaskProperties(properties) != 0) {
221         finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
222     }
223     LOG(DEBUG, RUNTIME) << "After waiting tasks with properties: " << properties
224                         << " {added: " << taskStatistics_->GetCount(TaskStatus::ADDED, properties)
225                         << ", executed: " << taskStatistics_->GetCount(TaskStatus::EXECUTED, properties)
226                         << ", popped: " << taskStatistics_->GetCount(TaskStatus::POPPED, properties) << "}";
227     taskStatistics_->ResetCountersWithTaskProperties(properties);
228 }
229 
Finalize()230 void TaskScheduler::Finalize()
231 {
232     ASSERT(start_);
233     {
234         // Wait all tasks will be done
235         os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
236         while (!AreNoMoreTasks()) {
237             finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
238         }
239         finish_ = true;
240         queuesWaitCondVar_.Signal();
241     }
242     for (auto worker : workers_) {
243         worker->Join();
244         delete worker;
245     }
246     taskStatistics_->ResetAllCounters();
247     LOG(DEBUG, RUNTIME) << "TaskScheduler: Finalized";
248 }
249 
IncrementCounterOfAddedTasks(TaskProperties properties,size_t ivalue,bool wasEmpty)250 void TaskScheduler::IncrementCounterOfAddedTasks(TaskProperties properties, size_t ivalue, bool wasEmpty)
251 {
252     taskStatistics_->IncrementCount(TaskStatus::ADDED, properties, ivalue);
253     if (wasEmpty) {
254         os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
255         queuesWaitCondVar_.Signal();
256     }
257 }
258 
IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap & counterMap)259 void TaskScheduler::IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap &counterMap)
260 {
261     for (const auto &[properties, count] : counterMap) {
262         taskStatistics_->IncrementCount(TaskStatus::EXECUTED, properties, count);
263         if (taskStatistics_->GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
264             os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
265             finishTasksCondVar_.SignalAll();
266         }
267     }
268 }
269 
~TaskScheduler()270 TaskScheduler::~TaskScheduler()
271 {
272     // We can delete TaskScheduler if it wasn't started or it was finished
273     ASSERT(start_ == finish_);
274     // Check if all task queue was deleted
275     ASSERT(taskQueues_.empty());
276     delete taskStatistics_;
277     LOG(DEBUG, RUNTIME) << "TaskScheduler: ~TaskScheduler: All threads finished jobs";
278 }
279 
280 }  // namespace panda::taskmanager
281