1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "libpandabase/taskmanager/task_scheduler.h"
17 #include "libpandabase/utils/logger.h"
18
19 namespace ark::taskmanager {
20
21 TaskScheduler *TaskScheduler::instance_ = nullptr;
22
TaskScheduler(size_t workersCount,TaskTimeStatsType taskTimeStatsType)23 TaskScheduler::TaskScheduler(size_t workersCount, TaskTimeStatsType taskTimeStatsType)
24 : workersCount_(workersCount), taskTimeStatsType_(taskTimeStatsType), selector_(taskQueues_)
25 {
26 switch (taskTimeStatsType) {
27 case TaskTimeStatsType::LIGHT_STATISTICS:
28 taskTimeStats_ = new internal::LightTaskTimeTimeStats(workersCount);
29 break;
30 case TaskTimeStatsType::NO_STATISTICS:
31 break;
32 default:
33 UNREACHABLE();
34 }
35 }
36
37 /* static */
Create(size_t threadsCount,TaskTimeStatsType taskTimeStatsType)38 TaskScheduler *TaskScheduler::Create(size_t threadsCount, TaskTimeStatsType taskTimeStatsType)
39 {
40 ASSERT(instance_ == nullptr);
41 ASSERT(threadsCount > 0);
42 instance_ = new TaskScheduler(threadsCount, taskTimeStatsType);
43 return instance_;
44 }
45
46 /* static */
GetTaskScheduler()47 TaskScheduler *TaskScheduler::GetTaskScheduler()
48 {
49 return instance_;
50 }
51
52 /* static */
Destroy()53 void TaskScheduler::Destroy()
54 {
55 ASSERT(instance_ != nullptr);
56 delete instance_;
57 instance_ = nullptr;
58 }
59
RegisterQueue(internal::SchedulableTaskQueueInterface * queue)60 TaskQueueId TaskScheduler::RegisterQueue(internal::SchedulableTaskQueueInterface *queue)
61 {
62 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
63 ASSERT(!start_);
64 LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: Register task queue with {" << queue->GetTaskType() << ", "
65 << queue->GetVMType() << "}";
66 TaskQueueId id(queue->GetTaskType(), queue->GetVMType());
67 if (UNLIKELY(taskQueues_.find(id) != taskQueues_.end())) {
68 return INVALID_TASKQUEUE_ID;
69 }
70 taskQueues_[id] = queue;
71 queue->SetCallbacks(
72 [this](TaskProperties properties, size_t count) { this->IncrementCounterOfAddedTasks(properties, count); },
73 [this]() { this->SignalWorkers(); });
74
75 // init countOfTasksIsSystem_ for possible task from registered queue
76 countOfTasksInSystem_[{queue->GetTaskType(), queue->GetVMType(), TaskExecutionMode::FOREGROUND}] = 0U;
77 countOfTasksInSystem_[{queue->GetTaskType(), queue->GetVMType(), TaskExecutionMode::BACKGROUND}] = 0U;
78
79 return id;
80 }
81
Initialize()82 void TaskScheduler::Initialize()
83 {
84 ASSERT(!start_);
85 selector_.Init();
86 start_ = true;
87 LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: creates " << workersCount_ << " threads";
88 // Starts all workers
89 for (size_t i = 0; i < workersCount_; i++) {
90 workers_.push_back(new WorkerThread("TSWorker_" + std::to_string(i + 1UL)));
91 LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: created thread with name " << workers_.back()->GetWorkerName();
92 }
93 // Set names of workers and get them info about other ones
94 for (auto *worker : workers_) {
95 worker->RegisterAllWorkersInLocalQueue(workers_);
96 }
97 // Start worker loop executing
98 for (auto *worker : workers_) {
99 worker->Start();
100 }
101 // Atomic with release order reason: other thread should see last value
102 disableHelpers_.store(false, std::memory_order_release);
103 }
104
StealTaskFromOtherWorker(WorkerThread * taskReceiver)105 void TaskScheduler::StealTaskFromOtherWorker(WorkerThread *taskReceiver)
106 {
107 // WorkerThread tries to find Worker with the most tasks
108 auto chosenWorker =
109 *std::max_element(workers_.begin(), workers_.end(),
110 [](const WorkerThread *lv, const WorkerThread *rv) { return lv->Size() < rv->Size(); });
111 if (chosenWorker->Size() == 0) {
112 return;
113 }
114 // If worker was successfully found, steals task from its local queue
115 chosenWorker->GiveTasksToAnotherWorker(
116 [taskReceiver](Task &&task) { taskReceiver->SetStolenTask(std::move(task)); }, 1UL,
117 chosenWorker->GetLocalWorkerQueuePopId(taskReceiver));
118 }
119
FillWithTasks(WorkerThread * worker)120 bool TaskScheduler::FillWithTasks(WorkerThread *worker)
121 {
122 ASSERT(start_);
123 std::queue<Task> readyTasks;
124 {
125 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
126 // We increment counter of waiters to signal them in future
127 if (AreQueuesEmpty() && !waitList_.HaveReadyValue()) {
128 if (!WaitUntilNewTasks()) {
129 return true;
130 }
131 }
132 if (!waitList_.HaveReadyValue()) {
133 // Use selector to choose next queue to pop task
134 auto selectedQueue = selector_.SelectQueue();
135 ASSERT(selectedQueue != INVALID_TASKQUEUE_ID);
136 // Getting task from selected queue
137 PutTasksInWorker(worker, selectedQueue);
138 return false;
139 }
140 // Wait list have task that should be added in queues. So worker firstly should add this task in local queue
141 // under lock-holder. Next in should add them in TaskQueues
142 PutWaitTaskInLocalQueue(readyTasks);
143 }
144 // Worker puts tasks to TaskQueues without lock-holder to avoid unnecessary waiting of other workers
145 PutTaskInTaskQueues(readyTasks);
146 return false;
147 }
148
WaitUntilNewTasks()149 bool TaskScheduler::WaitUntilNewTasks()
150 {
151 // Atomic with acq_rel order reason: sync for counter
152 waitWorkersCount_.fetch_add(1, std::memory_order_acq_rel);
153 while (AreQueuesEmpty() && !waitList_.HaveReadyValue() && !finish_) {
154 queuesWaitCondVar_.TimedWait(&taskSchedulerStateLock_, TASK_WAIT_TIMEOUT);
155 }
156 // Atomic with acq_rel order reason: sync for counter
157 waitWorkersCount_.fetch_sub(1, std::memory_order_acq_rel);
158 return !finish_;
159 }
160
PutTasksInWorker(WorkerThread * worker,TaskQueueId selectedQueue)161 size_t TaskScheduler::PutTasksInWorker(WorkerThread *worker, TaskQueueId selectedQueue)
162 {
163 auto addTaskFunc = [worker](Task &&task) { worker->AddTask(std::move(task)); };
164 auto queue = taskQueues_[selectedQueue];
165
166 // Now we calc how many task we want to get from queue. If there are few tasks, then we want them to be evenly
167 // distributed among the workers.
168 size_t size = queue->Size();
169 size_t countToGet = size / workers_.size();
170 countToGet = (countToGet >= WorkerThread::WORKER_QUEUE_SIZE) ? WorkerThread::WORKER_QUEUE_SIZE
171 : (size % workers_.size() == 0) ? countToGet
172 : countToGet + 1;
173 // Firstly we use method to delete retired ptrs
174 worker->TryDeleteRetiredPtrs();
175 // Execute popping task form queue
176 size_t queueTaskCount = queue->PopTasksToWorker(addTaskFunc, countToGet);
177 LOG(DEBUG, TASK_MANAGER) << worker->GetWorkerName() << ": get tasks " << queueTaskCount << "; ";
178 return queueTaskCount;
179 }
180
AreQueuesEmpty() const181 bool TaskScheduler::AreQueuesEmpty() const
182 {
183 for ([[maybe_unused]] const auto &[id, queue] : taskQueues_) {
184 ASSERT(queue != nullptr);
185 if (!queue->IsEmpty()) {
186 return false;
187 }
188 }
189 return true;
190 }
191
AreWorkersEmpty() const192 bool TaskScheduler::AreWorkersEmpty() const
193 {
194 for (auto *worker : workers_) {
195 if (!worker->IsEmpty()) {
196 return false;
197 }
198 }
199 return true;
200 }
201
AreNoMoreTasks() const202 bool TaskScheduler::AreNoMoreTasks() const
203 {
204 return GetCountOfTasksInSystem() == 0;
205 }
206
HelpWorkersWithTasks(TaskProperties properties)207 size_t TaskScheduler::HelpWorkersWithTasks(TaskProperties properties)
208 {
209 // Atomic with acquire order reason: getting correct value
210 if (disableHelpers_.load(std::memory_order_acquire)) {
211 return 0;
212 }
213 size_t executedTasksCount = 0;
214 auto *queue = GetQueue({properties.GetTaskType(), properties.GetVMType()});
215 if (queue->HasTaskWithExecutionMode(properties.GetTaskExecutionMode())) {
216 executedTasksCount = GetAndExecuteSetOfTasksFromQueue(properties);
217 } else if (LIKELY(!workers_.empty())) {
218 executedTasksCount = StealAndExecuteOneTaskFromWorkers(properties);
219 }
220 if (UNLIKELY(executedTasksCount == 0)) {
221 LOG(DEBUG, TASK_MANAGER) << "Helper: got no tasks;";
222 return 0;
223 }
224 LOG(DEBUG, TASK_MANAGER) << "Helper: executed tasks: " << executedTasksCount << ";";
225 DecrementCountOfTasksInSystem(properties, executedTasksCount);
226
227 // Atomic with acquire order reason: get correct value
228 auto waitToFinish = waitToFinish_.load(std::memory_order_acquire);
229 if (waitToFinish > 0 && GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
230 os::memory::LockHolder taskManagerLockHolder(taskSchedulerStateLock_);
231 finishTasksCondVar_.SignalAll();
232 }
233 return executedTasksCount;
234 }
235
GetAndExecuteSetOfTasksFromQueue(TaskProperties properties)236 size_t TaskScheduler::GetAndExecuteSetOfTasksFromQueue(TaskProperties properties)
237 {
238 auto *queue = GetQueue({properties.GetTaskType(), properties.GetVMType()});
239 if (queue->IsEmpty()) {
240 return 0;
241 }
242
243 std::queue<Task> taskQueue;
244 size_t realCount = 0;
245 {
246 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
247 size_t size = queue->CountOfTasksWithExecutionMode(properties.GetTaskExecutionMode());
248 size_t countToGet = size / (workers_.size() + 1);
249 countToGet = (countToGet >= WorkerThread::WORKER_QUEUE_SIZE) ? WorkerThread::WORKER_QUEUE_SIZE
250 : (size % (workers_.size() + 1) == 0) ? countToGet
251 : countToGet + 1;
252 realCount = queue->PopTasksToHelperThread([&taskQueue](Task &&task) { taskQueue.push(std::move(task)); },
253 countToGet, properties.GetTaskExecutionMode());
254 }
255 while (!taskQueue.empty()) {
256 taskQueue.front().RunTask();
257 taskQueue.pop();
258 }
259 return realCount;
260 }
261
StealAndExecuteOneTaskFromWorkers(TaskProperties properties)262 size_t TaskScheduler::StealAndExecuteOneTaskFromWorkers(TaskProperties properties)
263 {
264 ASSERT(!workers_.empty());
265 std::queue<Task> taskQueue;
266 auto addTaskToQueue = [&taskQueue](Task &&task) { taskQueue.push(std::move(task)); };
267 while (true) {
268 auto chosenWorker = *std::max_element(
269 workers_.begin(), workers_.end(), [&properties](const WorkerThread *lv, const WorkerThread *rv) {
270 return lv->CountOfTasksWithProperties(properties) < rv->CountOfTasksWithProperties(properties);
271 });
272 if UNLIKELY (chosenWorker->CountOfTasksWithProperties(properties) == 0) {
273 return 0;
274 }
275
276 auto stolen = chosenWorker->GiveTasksToAnotherWorker(
277 addTaskToQueue, 1UL, chosenWorker->GetLocalWorkerQueueSchedulerPopId(), properties);
278 if (stolen == 0) { // check if stealing was successful
279 // if we did not stole we should retry
280 continue;
281 }
282 while (!taskQueue.empty()) {
283 taskQueue.front().RunTask();
284 taskQueue.pop();
285 }
286 return stolen;
287 }
288 }
289
WaitForFinishAllTasksWithProperties(TaskProperties properties)290 void TaskScheduler::WaitForFinishAllTasksWithProperties(TaskProperties properties)
291 {
292 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
293 // Atomic with acq_rel order reason: other thread should see correct value
294 waitToFinish_.fetch_add(1, std::memory_order_acq_rel);
295 while (GetCountOfTasksInSystemWithTaskProperties(properties) != 0) {
296 finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
297 }
298 // Atomic with acq_rel order reason: other thread should see correct value
299 waitToFinish_.fetch_sub(1, std::memory_order_acq_rel);
300 }
301
Finalize()302 void TaskScheduler::Finalize()
303 {
304 ASSERT(start_);
305 {
306 // Wait all tasks will be done
307 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
308 // Atomic with acq_rel order reason: other thread should
309 // see correct value
310 waitToFinish_.fetch_add(1, std::memory_order_acq_rel);
311 while (!AreNoMoreTasks()) {
312 finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
313 }
314 finish_ = true;
315 // Atomic with release order reason: other thread should see last value
316 disableHelpers_.store(true, std::memory_order_release);
317 // Atomic with acq_rel order reason: other thread should
318 // see correct value
319 waitToFinish_.fetch_sub(1, std::memory_order_acq_rel);
320 queuesWaitCondVar_.SignalAll();
321 }
322 for (auto *worker : workers_) {
323 worker->Join();
324 }
325 for (auto *worker : workers_) {
326 delete worker;
327 }
328
329 if (IsTaskLifetimeStatisticsUsed()) {
330 for (const auto &line : taskTimeStats_->GetTaskStatistics()) {
331 LOG(INFO, TASK_MANAGER) << line;
332 }
333 }
334 LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: Finalized";
335 }
336
IncrementCounterOfAddedTasks(TaskProperties properties,size_t ivalue)337 void TaskScheduler::IncrementCounterOfAddedTasks(TaskProperties properties, size_t ivalue)
338 {
339 IncrementCountOfTasksInSystem(properties, ivalue);
340 }
341
IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap & counterMap)342 size_t TaskScheduler::IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap &counterMap)
343 {
344 size_t countOfTasks = 0;
345 for (const auto &[properties, count] : counterMap) {
346 countOfTasks += count;
347 DecrementCountOfTasksInSystem(properties, count);
348 // Atomic with acquire order reason: get correct value
349 auto waitToFinish = waitToFinish_.load(std::memory_order_acquire);
350 if (waitToFinish > 0 && GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
351 os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
352 finishTasksCondVar_.SignalAll();
353 }
354 }
355 return countOfTasks;
356 }
357
SignalWorkers()358 void TaskScheduler::SignalWorkers()
359 {
360 // Atomic with acquire order reason: get correct value
361 if (waitWorkersCount_.load(std::memory_order_acquire) > 0) {
362 os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
363 queuesWaitCondVar_.Signal();
364 }
365 }
366
GetQueue(TaskQueueId id) const367 internal::SchedulableTaskQueueInterface *TaskScheduler::GetQueue(TaskQueueId id) const
368 {
369 internal::SchedulableTaskQueueInterface *queue = nullptr;
370 auto taskQueuesIterator = taskQueues_.find(id);
371 if (taskQueuesIterator == taskQueues_.end()) {
372 LOG(FATAL, COMMON) << "Attempt to take a task from a non-existent queue";
373 }
374 std::tie(std::ignore, queue) = *taskQueuesIterator;
375 return queue;
376 }
377
AddTaskToWaitListWithTimeout(Task && task,uint64_t time)378 WaiterId TaskScheduler::AddTaskToWaitListWithTimeout(Task &&task, uint64_t time)
379 {
380 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
381 this->IncrementCounterOfAddedTasks(task.GetTaskProperties(), 1U);
382 return waitList_.AddValueToWait(std::move(task), time);
383 }
384
AddTaskToWaitList(Task && task)385 WaiterId TaskScheduler::AddTaskToWaitList(Task &&task)
386 {
387 // Use adding with max time as possible, wait list will understand that it should set max possible time
388 return AddTaskToWaitListWithTimeout(std::move(task), std::numeric_limits<uint64_t>().max());
389 }
390
PutWaitTaskInLocalQueue(LocalTaskQueue & queue)391 void TaskScheduler::PutWaitTaskInLocalQueue(LocalTaskQueue &queue)
392 {
393 for (auto task = waitList_.GetReadyValue(); task.has_value(); task = waitList_.GetReadyValue()) {
394 queue.push(std::move(task.value()));
395 }
396 }
397
PutTaskInTaskQueues(LocalTaskQueue & queue)398 void TaskScheduler::PutTaskInTaskQueues(LocalTaskQueue &queue)
399 {
400 while (!queue.empty()) {
401 Task task = std::move(queue.front());
402 queue.pop();
403 auto prop = task.GetTaskProperties();
404 auto *taskQueue = GetQueue({prop.GetTaskType(), prop.GetVMType()});
405 taskQueue->AddTaskWithoutNewTaskCallbackExecution(std::move(task));
406 }
407 }
408
SignalWaitList(WaiterId waiterId)409 void TaskScheduler::SignalWaitList(WaiterId waiterId)
410 {
411 std::optional<Task> task;
412 {
413 os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
414 task = waitList_.GetValueById(waiterId);
415 }
416 if (!task.has_value()) {
417 return;
418 }
419 auto prop = task->GetTaskProperties();
420 auto *queue = GetQueue({prop.GetTaskType(), prop.GetVMType()});
421 queue->AddTaskWithoutNewTaskCallbackExecution(std::move(task.value()));
422 }
423
~TaskScheduler()424 TaskScheduler::~TaskScheduler()
425 {
426 // We can delete TaskScheduler if it wasn't started or it was finished
427 ASSERT(start_ == finish_);
428 // Check if all task queue was deleted
429 ASSERT(taskQueues_.empty());
430 delete taskTimeStats_;
431 }
432
IncrementCountOfTasksInSystem(TaskProperties prop,size_t count)433 void TaskScheduler::IncrementCountOfTasksInSystem(TaskProperties prop, size_t count)
434 {
435 // Atomic with acq_rel order reason: fast add count to countOfTasksInSystem_[prop]
436 countOfTasksInSystem_[prop].fetch_add(count, std::memory_order_acq_rel);
437 }
438
DecrementCountOfTasksInSystem(TaskProperties prop,size_t count)439 void TaskScheduler::DecrementCountOfTasksInSystem(TaskProperties prop, size_t count)
440 {
441 // Atomic with acq_rel order reason: fast sub count to countOfTasksInSystem_[prop]
442 countOfTasksInSystem_[prop].fetch_sub(count, std::memory_order_acq_rel);
443 }
444
GetCountOfTasksInSystemWithTaskProperties(TaskProperties prop) const445 size_t TaskScheduler::GetCountOfTasksInSystemWithTaskProperties(TaskProperties prop) const
446 {
447 // Atomic with acquire order reason: need to sync with all prev fetch_adds and fetch_subs
448 return countOfTasksInSystem_.at(prop).load(std::memory_order_acquire);
449 }
450
GetCountOfTasksInSystem() const451 size_t TaskScheduler::GetCountOfTasksInSystem() const
452 {
453 size_t sumCount = 0;
454 for ([[maybe_unused]] const auto &[prop, counter] : countOfTasksInSystem_) {
455 // Atomic with acquire order reason: need to sync with all prev fetch_adds and fetch_subs
456 sumCount += counter.load(std::memory_order_acquire);
457 }
458 return sumCount;
459 }
460
IsTaskLifetimeStatisticsUsed() const461 bool TaskScheduler::IsTaskLifetimeStatisticsUsed() const
462 {
463 return taskTimeStatsType_ != TaskTimeStatsType::NO_STATISTICS;
464 }
465
GetTaskTimeStats() const466 TaskTimeStatsBase *TaskScheduler::GetTaskTimeStats() const
467 {
468 return taskTimeStats_;
469 }
470
471 } // namespace ark::taskmanager
472