1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "libpandabase/taskmanager/task_scheduler.h"
17 #include "libpandabase/utils/logger.h"
18
19 namespace ark::taskmanager {
20
21 TaskScheduler *TaskScheduler::instance_ = nullptr;
22
TaskScheduler(size_t workersCount,TaskTimeStatsType taskTimeStatsType)23 TaskScheduler::TaskScheduler(size_t workersCount, TaskTimeStatsType taskTimeStatsType)
24 : workersCount_(workersCount), taskTimeStatsType_(taskTimeStatsType), selector_(taskQueues_)
25 {
26 switch (taskTimeStatsType) {
27 case TaskTimeStatsType::LIGHT_STATISTICS:
28 taskTimeStats_ = new internal::LightTaskTimeTimeStats(workersCount);
29 break;
30 case TaskTimeStatsType::NO_STATISTICS:
31 break;
32 default:
33 UNREACHABLE();
34 }
35 }
36
37 /* static */
Create(size_t threadsCount,TaskTimeStatsType taskTimeStatsType)38 TaskScheduler *TaskScheduler::Create(size_t threadsCount, TaskTimeStatsType taskTimeStatsType)
39 {
40 ASSERT(instance_ == nullptr);
41 ASSERT(threadsCount > 0);
42 instance_ = new TaskScheduler(threadsCount, taskTimeStatsType);
43 return instance_;
44 }
45
46 /* static */
GetTaskScheduler()47 TaskScheduler *TaskScheduler::GetTaskScheduler()
48 {
49 return instance_;
50 }
51
52 /* static */
Destroy()53 void TaskScheduler::Destroy()
54 {
55 ASSERT(instance_ != nullptr);
56 delete instance_;
57 instance_ = nullptr;
58 }
59
RegisterQueue(internal::SchedulableTaskQueueInterface * queue)60 TaskQueueId TaskScheduler::RegisterQueue(internal::SchedulableTaskQueueInterface *queue)
61 {
62 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
63 ASSERT(!start_);
64 LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: Register task queue with {" << queue->GetTaskType() << ", "
65 << queue->GetVMType() << "}";
66 TaskQueueId id(queue->GetTaskType(), queue->GetVMType());
67 if (UNLIKELY(taskQueues_.find(id) != taskQueues_.end())) {
68 return INVALID_TASKQUEUE_ID;
69 }
70 taskQueues_[id] = queue;
71 queue->SetCallbacks(
72 [this](TaskProperties properties, size_t count) { this->IncrementCounterOfAddedTasks(properties, count); },
73 [this]() { this->SignalWorkers(); });
74
75 // init countOfTasksIsSystem_ for possible task from registered queue
76 countOfTasksInSystem_[{queue->GetTaskType(), queue->GetVMType(), TaskExecutionMode::FOREGROUND}] = 0U;
77 countOfTasksInSystem_[{queue->GetTaskType(), queue->GetVMType(), TaskExecutionMode::BACKGROUND}] = 0U;
78
79 return id;
80 }
81
Initialize()82 void TaskScheduler::Initialize()
83 {
84 ASSERT(!start_);
85 selector_.Init();
86 start_ = true;
87 LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: creates " << workersCount_ << " threads";
88 // Starts all workers
89 for (size_t i = 0; i < workersCount_; i++) {
90 workers_.push_back(new WorkerThread("TSWorker_" + std::to_string(i + 1UL)));
91 LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: created thread with name " << workers_.back()->GetWorkerName();
92 }
93 // Set names of workers and get them info about other ones
94 for (auto *worker : workers_) {
95 worker->RegisterAllWorkersInLocalQueue(workers_);
96 }
97 // Start worker loop executing
98 for (auto *worker : workers_) {
99 worker->Start();
100 }
101 // Atomic with release order reason: other thread should see last value
102 disableHelpers_.store(false, std::memory_order_release);
103 }
104
StealTaskFromOtherWorker(WorkerThread * taskReceiver)105 void TaskScheduler::StealTaskFromOtherWorker(WorkerThread *taskReceiver)
106 {
107 // WorkerThread tries to find Worker with the most tasks
108 auto chosenWorker =
109 *std::max_element(workers_.begin(), workers_.end(),
110 [](const WorkerThread *lv, const WorkerThread *rv) { return lv->Size() < rv->Size(); });
111 if (chosenWorker->Size() == 0) {
112 return;
113 }
114 // If worker was successfully found, steals task from its local queue
115 chosenWorker->GiveTasksToAnotherWorker(
116 [taskReceiver](Task &&task) { taskReceiver->SetStolenTask(std::move(task)); }, 1UL,
117 chosenWorker->GetLocalWorkerQueuePopId(taskReceiver));
118 }
119
FillWithTasks(WorkerThread * worker)120 bool TaskScheduler::FillWithTasks(WorkerThread *worker)
121 {
122 ASSERT(start_);
123 std::queue<Task> readyTasks;
124 {
125 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
126 // We increment counter of waiters to signal them in future
127 if (AreQueuesEmpty() && !waitList_.HaveReadyValue()) {
128 if (!WaitUntilNewTasks()) {
129 return true;
130 }
131 }
132 if (!waitList_.HaveReadyValue()) {
133 // Use selector to choose next queue to pop task
134 auto selectedQueue = selector_.SelectQueue();
135 ASSERT(selectedQueue != INVALID_TASKQUEUE_ID);
136 // Getting task from selected queue
137 PutTasksInWorker(worker, selectedQueue);
138 return false;
139 }
140 // Wait list have task that should be added in queues. So worker firstly should add this task in local queue
141 // under lock-holder. Next in should add them in TaskQueues
142 PutWaitTaskInLocalQueue(readyTasks);
143 }
144 // Worker puts tasks to TaskQueues without lock-holder to avoid unnecessary waiting of other workers
145 PutTaskInTaskQueues(readyTasks);
146 return false;
147 }
148
WaitUntilNewTasks()149 bool TaskScheduler::WaitUntilNewTasks()
150 {
151 // Atomic with acq_rel order reason: sync for counter
152 waitWorkersCount_.fetch_add(1, std::memory_order_acq_rel);
153 while (AreQueuesEmpty() && !waitList_.HaveReadyValue() && !finish_) {
154 queuesWaitCondVar_.TimedWait(&taskSchedulerStateLock_, TASK_WAIT_TIMEOUT);
155 }
156 // Atomic with acq_rel order reason: sync for counter
157 waitWorkersCount_.fetch_sub(1, std::memory_order_acq_rel);
158 return !finish_;
159 }
160
PutTasksInWorker(WorkerThread * worker,TaskQueueId selectedQueue)161 size_t TaskScheduler::PutTasksInWorker(WorkerThread *worker, TaskQueueId selectedQueue)
162 {
163 auto addTaskFunc = [worker](Task &&task) { worker->AddTask(std::move(task)); };
164 auto queue = taskQueues_[selectedQueue];
165
166 // Now we calc how many task we want to get from queue. If there are few tasks, then we want them to be evenly
167 // distributed among the workers.
168 size_t size = queue->Size();
169 size_t countToGet = size / workers_.size();
170 countToGet = (countToGet >= WorkerThread::WORKER_QUEUE_SIZE) ? WorkerThread::WORKER_QUEUE_SIZE
171 : (size % workers_.size() == 0) ? countToGet
172 : countToGet + 1;
173 // Firstly we use method to delete retired ptrs
174 worker->TryDeleteRetiredPtrs();
175 // Execute popping task form queue
176 size_t queueTaskCount = queue->PopTasksToWorker(addTaskFunc, countToGet);
177 LOG(DEBUG, TASK_MANAGER) << worker->GetWorkerName() << ": get tasks " << queueTaskCount << "; ";
178 return queueTaskCount;
179 }
180
AreQueuesEmpty() const181 bool TaskScheduler::AreQueuesEmpty() const
182 {
183 for ([[maybe_unused]] const auto &[id, queue] : taskQueues_) {
184 ASSERT(queue != nullptr);
185 if (!queue->IsEmpty()) {
186 return false;
187 }
188 }
189 return true;
190 }
191
AreWorkersEmpty() const192 bool TaskScheduler::AreWorkersEmpty() const
193 {
194 for (auto *worker : workers_) {
195 if (!worker->IsEmpty()) {
196 return false;
197 }
198 }
199 return true;
200 }
201
AreNoMoreTasks() const202 bool TaskScheduler::AreNoMoreTasks() const
203 {
204 return GetCountOfTasksInSystem() == 0;
205 }
206
HelpWorkersWithTasks(TaskProperties properties)207 size_t TaskScheduler::HelpWorkersWithTasks(TaskProperties properties)
208 {
209 // Atomic with acquire order reason: getting correct value
210 if (disableHelpers_.load(std::memory_order_acquire)) {
211 return 0;
212 }
213 size_t executedTasksCount = 0;
214 auto *queue = GetQueue({properties.GetTaskType(), properties.GetVMType()});
215 if (queue->HasTaskWithExecutionMode(properties.GetTaskExecutionMode())) {
216 executedTasksCount = GetAndExecuteSetOfTasksFromQueue(properties);
217 } else if (LIKELY(!workers_.empty())) {
218 executedTasksCount = StealAndExecuteOneTaskFromWorkers(properties);
219 }
220 if (UNLIKELY(executedTasksCount == 0)) {
221 LOG(DEBUG, TASK_MANAGER) << "Helper: got no tasks;";
222 return 0;
223 }
224 LOG(DEBUG, TASK_MANAGER) << "Helper: executed tasks: " << executedTasksCount << ";";
225 DecrementCountOfTasksInSystem(properties, executedTasksCount);
226
227 // Atomic with acquire order reason: get correct value
228 auto waitToFinish = waitToFinish_.load(std::memory_order_acquire);
229 if (waitToFinish > 0 && GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
230 os::memory::LockHolder taskManagerLockHolder(taskSchedulerStateLock_);
231 finishTasksCondVar_.SignalAll();
232 }
233 return executedTasksCount;
234 }
235
GetAndExecuteSetOfTasksFromQueue(TaskProperties properties)236 size_t TaskScheduler::GetAndExecuteSetOfTasksFromQueue(TaskProperties properties)
237 {
238 auto *queue = GetQueue({properties.GetTaskType(), properties.GetVMType()});
239 if (queue->IsEmpty()) {
240 return 0;
241 }
242
243 std::queue<Task> taskQueue;
244 size_t realCount = 0;
245 {
246 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
247 size_t size = queue->CountOfTasksWithExecutionMode(properties.GetTaskExecutionMode());
248 size_t countToGet = size / (workers_.size() + 1);
249 countToGet = (countToGet >= WorkerThread::WORKER_QUEUE_SIZE) ? WorkerThread::WORKER_QUEUE_SIZE
250 : (size % (workers_.size() + 1) == 0) ? countToGet
251 : countToGet + 1;
252 realCount = queue->PopTasksToHelperThread([&taskQueue](Task &&task) { taskQueue.push(std::move(task)); },
253 countToGet, properties.GetTaskExecutionMode());
254 }
255 while (!taskQueue.empty()) {
256 taskQueue.front().RunTask();
257 taskQueue.pop();
258 }
259 return realCount;
260 }
261
StealAndExecuteOneTaskFromWorkers(TaskProperties properties)262 size_t TaskScheduler::StealAndExecuteOneTaskFromWorkers(TaskProperties properties)
263 {
264 ASSERT(!workers_.empty());
265 std::queue<Task> taskQueue;
266 auto addTaskToQueue = [&taskQueue](Task &&task) { taskQueue.push(std::move(task)); };
267 auto chosenWorker = *std::max_element(
268 workers_.begin(), workers_.end(), [&properties](const WorkerThread *lv, const WorkerThread *rv) {
269 return lv->CountOfTasksWithProperties(properties) < rv->CountOfTasksWithProperties(properties);
270 });
271 if (chosenWorker->CountOfTasksWithProperties(properties) == 0) {
272 return 0;
273 }
274
275 auto stolen = chosenWorker->GiveTasksToAnotherWorker(addTaskToQueue, 1UL,
276 chosenWorker->GetLocalWorkerQueueSchedulerPopId(), properties);
277
278 while (!taskQueue.empty()) {
279 taskQueue.front().RunTask();
280 taskQueue.pop();
281 }
282 return stolen;
283 }
284
WaitForFinishAllTasksWithProperties(TaskProperties properties)285 void TaskScheduler::WaitForFinishAllTasksWithProperties(TaskProperties properties)
286 {
287 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
288 // Atomic with acq_rel order reason: other thread should see correct value
289 waitToFinish_.fetch_add(1, std::memory_order_acq_rel);
290 while (GetCountOfTasksInSystemWithTaskProperties(properties) != 0) {
291 finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
292 }
293 // Atomic with acq_rel order reason: other thread should see correct value
294 waitToFinish_.fetch_sub(1, std::memory_order_acq_rel);
295 }
296
Finalize()297 void TaskScheduler::Finalize()
298 {
299 ASSERT(start_);
300 {
301 // Wait all tasks will be done
302 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
303 // Atomic with acq_rel order reason: other thread should
304 // see correct value
305 waitToFinish_.fetch_add(1, std::memory_order_acq_rel);
306 while (!AreNoMoreTasks()) {
307 finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
308 }
309 finish_ = true;
310 // Atomic with release order reason: other thread should see last value
311 disableHelpers_.store(true, std::memory_order_release);
312 // Atomic with acq_rel order reason: other thread should
313 // see correct value
314 waitToFinish_.fetch_sub(1, std::memory_order_acq_rel);
315 queuesWaitCondVar_.SignalAll();
316 }
317 for (auto *worker : workers_) {
318 worker->Join();
319 }
320 for (auto *worker : workers_) {
321 delete worker;
322 }
323
324 if (IsTaskLifetimeStatisticsUsed()) {
325 for (const auto &line : taskTimeStats_->GetTaskStatistics()) {
326 LOG(INFO, TASK_MANAGER) << line;
327 }
328 }
329 LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: Finalized";
330 }
331
IncrementCounterOfAddedTasks(TaskProperties properties,size_t ivalue)332 void TaskScheduler::IncrementCounterOfAddedTasks(TaskProperties properties, size_t ivalue)
333 {
334 IncrementCountOfTasksInSystem(properties, ivalue);
335 }
336
IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap & counterMap)337 size_t TaskScheduler::IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap &counterMap)
338 {
339 size_t countOfTasks = 0;
340 for (const auto &[properties, count] : counterMap) {
341 countOfTasks += count;
342 DecrementCountOfTasksInSystem(properties, count);
343 // Atomic with acquire order reason: get correct value
344 auto waitToFinish = waitToFinish_.load(std::memory_order_acquire);
345 if (waitToFinish > 0 && GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
346 os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
347 finishTasksCondVar_.SignalAll();
348 }
349 }
350 return countOfTasks;
351 }
352
SignalWorkers()353 void TaskScheduler::SignalWorkers()
354 {
355 // Atomic with acquire order reason: get correct value
356 if (waitWorkersCount_.load(std::memory_order_acquire) > 0) {
357 os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
358 queuesWaitCondVar_.Signal();
359 }
360 }
361
GetQueue(TaskQueueId id) const362 internal::SchedulableTaskQueueInterface *TaskScheduler::GetQueue(TaskQueueId id) const
363 {
364 internal::SchedulableTaskQueueInterface *queue = nullptr;
365 auto taskQueuesIterator = taskQueues_.find(id);
366 if (taskQueuesIterator == taskQueues_.end()) {
367 LOG(FATAL, COMMON) << "Attempt to take a task from a non-existent queue";
368 }
369 std::tie(std::ignore, queue) = *taskQueuesIterator;
370 return queue;
371 }
372
AddTaskToWaitListWithTimeout(Task && task,uint64_t time)373 WaiterId TaskScheduler::AddTaskToWaitListWithTimeout(Task &&task, uint64_t time)
374 {
375 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
376 this->IncrementCounterOfAddedTasks(task.GetTaskProperties(), 1U);
377 return waitList_.AddValueToWait(std::move(task), time);
378 }
379
AddTaskToWaitList(Task && task)380 WaiterId TaskScheduler::AddTaskToWaitList(Task &&task)
381 {
382 // Use adding with max time as possible, wait list will understand that it should set max possible time
383 return AddTaskToWaitListWithTimeout(std::move(task), std::numeric_limits<uint64_t>().max());
384 }
385
PutWaitTaskInLocalQueue(LocalTaskQueue & queue)386 void TaskScheduler::PutWaitTaskInLocalQueue(LocalTaskQueue &queue)
387 {
388 for (auto task = waitList_.GetReadyValue(); task.has_value(); task = waitList_.GetReadyValue()) {
389 queue.push(std::move(task.value()));
390 }
391 }
392
PutTaskInTaskQueues(LocalTaskQueue & queue)393 void TaskScheduler::PutTaskInTaskQueues(LocalTaskQueue &queue)
394 {
395 while (!queue.empty()) {
396 Task task = std::move(queue.front());
397 queue.pop();
398 auto prop = task.GetTaskProperties();
399 auto *taskQueue = GetQueue({prop.GetTaskType(), prop.GetVMType()});
400 taskQueue->AddTaskWithoutNewTaskCallbackExecution(std::move(task));
401 }
402 }
403
SignalWaitList(WaiterId waiterId)404 void TaskScheduler::SignalWaitList(WaiterId waiterId)
405 {
406 std::optional<Task> task;
407 {
408 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
409 task = waitList_.GetValueById(waiterId);
410 }
411 if (!task.has_value()) {
412 return;
413 }
414 auto prop = task->GetTaskProperties();
415 auto *queue = GetQueue({prop.GetTaskType(), prop.GetVMType()});
416 queue->AddTaskWithoutNewTaskCallbackExecution(std::move(task.value()));
417 }
418
~TaskScheduler()419 TaskScheduler::~TaskScheduler()
420 {
421 // We can delete TaskScheduler if it wasn't started or it was finished
422 ASSERT(start_ == finish_);
423 // Check if all task queue was deleted
424 ASSERT(taskQueues_.empty());
425 delete taskTimeStats_;
426 }
427
IncrementCountOfTasksInSystem(TaskProperties prop,size_t count)428 void TaskScheduler::IncrementCountOfTasksInSystem(TaskProperties prop, size_t count)
429 {
430 // Atomic with acq_rel order reason: fast add count to countOfTasksInSystem_[prop]
431 countOfTasksInSystem_[prop].fetch_add(count, std::memory_order_acq_rel);
432 }
433
DecrementCountOfTasksInSystem(TaskProperties prop,size_t count)434 void TaskScheduler::DecrementCountOfTasksInSystem(TaskProperties prop, size_t count)
435 {
436 // Atomic with acq_rel order reason: fast sub count to countOfTasksInSystem_[prop]
437 countOfTasksInSystem_[prop].fetch_sub(count, std::memory_order_acq_rel);
438 }
439
GetCountOfTasksInSystemWithTaskProperties(TaskProperties prop) const440 size_t TaskScheduler::GetCountOfTasksInSystemWithTaskProperties(TaskProperties prop) const
441 {
442 // Atomic with acquire order reason: need to sync with all prev fetch_adds and fetch_subs
443 return countOfTasksInSystem_.at(prop).load(std::memory_order_acquire);
444 }
445
GetCountOfTasksInSystem() const446 size_t TaskScheduler::GetCountOfTasksInSystem() const
447 {
448 size_t sumCount = 0;
449 for ([[maybe_unused]] const auto &[prop, counter] : countOfTasksInSystem_) {
450 // Atomic with acquire order reason: need to sync with all prev fetch_adds and fetch_subs
451 sumCount += counter.load(std::memory_order_acquire);
452 }
453 return sumCount;
454 }
455
IsTaskLifetimeStatisticsUsed() const456 bool TaskScheduler::IsTaskLifetimeStatisticsUsed() const
457 {
458 return taskTimeStatsType_ != TaskTimeStatsType::NO_STATISTICS;
459 }
460
GetTaskTimeStats() const461 TaskTimeStatsBase *TaskScheduler::GetTaskTimeStats() const
462 {
463 return taskTimeStats_;
464 }
465
466 } // namespace ark::taskmanager
467