• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "libpandabase/taskmanager/task_scheduler.h"
17 #include "libpandabase/utils/logger.h"
18 
19 namespace ark::taskmanager {
20 
21 TaskScheduler *TaskScheduler::instance_ = nullptr;
22 
TaskScheduler(size_t workersCount,TaskTimeStatsType taskTimeStatsType)23 TaskScheduler::TaskScheduler(size_t workersCount, TaskTimeStatsType taskTimeStatsType)
24     : workersCount_(workersCount), taskTimeStatsType_(taskTimeStatsType), selector_(taskQueues_)
25 {
26     switch (taskTimeStatsType) {
27         case TaskTimeStatsType::LIGHT_STATISTICS:
28             taskTimeStats_ = new internal::LightTaskTimeTimeStats(workersCount);
29             break;
30         case TaskTimeStatsType::NO_STATISTICS:
31             break;
32         default:
33             UNREACHABLE();
34     }
35 }
36 
37 /* static */
Create(size_t threadsCount,TaskTimeStatsType taskTimeStatsType)38 TaskScheduler *TaskScheduler::Create(size_t threadsCount, TaskTimeStatsType taskTimeStatsType)
39 {
40     ASSERT(instance_ == nullptr);
41     ASSERT(threadsCount > 0);
42     instance_ = new TaskScheduler(threadsCount, taskTimeStatsType);
43     return instance_;
44 }
45 
46 /* static */
GetTaskScheduler()47 TaskScheduler *TaskScheduler::GetTaskScheduler()
48 {
49     return instance_;
50 }
51 
52 /* static */
Destroy()53 void TaskScheduler::Destroy()
54 {
55     ASSERT(instance_ != nullptr);
56     delete instance_;
57     instance_ = nullptr;
58 }
59 
RegisterQueue(internal::SchedulableTaskQueueInterface * queue)60 TaskQueueId TaskScheduler::RegisterQueue(internal::SchedulableTaskQueueInterface *queue)
61 {
62     os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
63     ASSERT(!start_);
64     LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: Register task queue with {" << queue->GetTaskType() << ", "
65                              << queue->GetVMType() << "}";
66     TaskQueueId id(queue->GetTaskType(), queue->GetVMType());
67     if (UNLIKELY(taskQueues_.find(id) != taskQueues_.end())) {
68         return INVALID_TASKQUEUE_ID;
69     }
70     taskQueues_[id] = queue;
71     queue->SetCallbacks(
72         [this](TaskProperties properties, size_t count) { this->IncrementCounterOfAddedTasks(properties, count); },
73         [this]() { this->SignalWorkers(); });
74 
75     // init countOfTasksIsSystem_ for possible task from registered queue
76     countOfTasksInSystem_[{queue->GetTaskType(), queue->GetVMType(), TaskExecutionMode::FOREGROUND}] = 0U;
77     countOfTasksInSystem_[{queue->GetTaskType(), queue->GetVMType(), TaskExecutionMode::BACKGROUND}] = 0U;
78 
79     return id;
80 }
81 
Initialize()82 void TaskScheduler::Initialize()
83 {
84     ASSERT(!start_);
85     selector_.Init();
86     start_ = true;
87     LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: creates " << workersCount_ << " threads";
88     // Starts all workers
89     for (size_t i = 0; i < workersCount_; i++) {
90         workers_.push_back(new WorkerThread("TSWorker_" + std::to_string(i + 1UL)));
91         LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: created thread with name " << workers_.back()->GetWorkerName();
92     }
93     // Set names of workers and get them info about other ones
94     for (auto *worker : workers_) {
95         worker->RegisterAllWorkersInLocalQueue(workers_);
96     }
97     // Start worker loop executing
98     for (auto *worker : workers_) {
99         worker->Start();
100     }
101     // Atomic with release order reason: other thread should see last value
102     disableHelpers_.store(false, std::memory_order_release);
103 }
104 
StealTaskFromOtherWorker(WorkerThread * taskReceiver)105 void TaskScheduler::StealTaskFromOtherWorker(WorkerThread *taskReceiver)
106 {
107     // WorkerThread tries to find Worker with the most tasks
108     auto chosenWorker =
109         *std::max_element(workers_.begin(), workers_.end(),
110                           [](const WorkerThread *lv, const WorkerThread *rv) { return lv->Size() < rv->Size(); });
111     if (chosenWorker->Size() == 0) {
112         return;
113     }
114     // If worker was successfully found, steals task from its local queue
115     chosenWorker->GiveTasksToAnotherWorker(
116         [taskReceiver](Task &&task) { taskReceiver->SetStolenTask(std::move(task)); }, 1UL,
117         chosenWorker->GetLocalWorkerQueuePopId(taskReceiver));
118 }
119 
FillWithTasks(WorkerThread * worker)120 bool TaskScheduler::FillWithTasks(WorkerThread *worker)
121 {
122     ASSERT(start_);
123     std::queue<Task> readyTasks;
124     {
125         os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
126         // We increment counter of waiters to signal them in future
127         if (AreQueuesEmpty() && !waitList_.HaveReadyValue()) {
128             if (!WaitUntilNewTasks()) {
129                 return true;
130             }
131         }
132         if (!waitList_.HaveReadyValue()) {
133             // Use selector to choose next queue to pop task
134             auto selectedQueue = selector_.SelectQueue();
135             ASSERT(selectedQueue != INVALID_TASKQUEUE_ID);
136             // Getting task from selected queue
137             PutTasksInWorker(worker, selectedQueue);
138             return false;
139         }
140         // Wait list have task that should be added in queues. So worker firstly should add this task in local queue
141         // under lock-holder. Next in should add them in TaskQueues
142         PutWaitTaskInLocalQueue(readyTasks);
143     }
144     // Worker puts tasks to TaskQueues without lock-holder to avoid unnecessary waiting of other workers
145     PutTaskInTaskQueues(readyTasks);
146     return false;
147 }
148 
WaitUntilNewTasks()149 bool TaskScheduler::WaitUntilNewTasks()
150 {
151     // Atomic with acq_rel order reason: sync for counter
152     waitWorkersCount_.fetch_add(1, std::memory_order_acq_rel);
153     while (AreQueuesEmpty() && !waitList_.HaveReadyValue() && !finish_) {
154         queuesWaitCondVar_.TimedWait(&taskSchedulerStateLock_, TASK_WAIT_TIMEOUT);
155     }
156     // Atomic with acq_rel order reason: sync for counter
157     waitWorkersCount_.fetch_sub(1, std::memory_order_acq_rel);
158     return !finish_;
159 }
160 
PutTasksInWorker(WorkerThread * worker,TaskQueueId selectedQueue)161 size_t TaskScheduler::PutTasksInWorker(WorkerThread *worker, TaskQueueId selectedQueue)
162 {
163     auto addTaskFunc = [worker](Task &&task) { worker->AddTask(std::move(task)); };
164     auto queue = taskQueues_[selectedQueue];
165 
166     // Now we calc how many task we want to get from queue. If there are few tasks, then we want them to be evenly
167     // distributed among the workers.
168     size_t size = queue->Size();
169     size_t countToGet = size / workers_.size();
170     countToGet = (countToGet >= WorkerThread::WORKER_QUEUE_SIZE) ? WorkerThread::WORKER_QUEUE_SIZE
171                  : (size % workers_.size() == 0)                 ? countToGet
172                                                                  : countToGet + 1;
173     // Firstly we use method to delete retired ptrs
174     worker->TryDeleteRetiredPtrs();
175     // Execute popping task form queue
176     size_t queueTaskCount = queue->PopTasksToWorker(addTaskFunc, countToGet);
177     LOG(DEBUG, TASK_MANAGER) << worker->GetWorkerName() << ": get tasks " << queueTaskCount << "; ";
178     return queueTaskCount;
179 }
180 
AreQueuesEmpty() const181 bool TaskScheduler::AreQueuesEmpty() const
182 {
183     for ([[maybe_unused]] const auto &[id, queue] : taskQueues_) {
184         ASSERT(queue != nullptr);
185         if (!queue->IsEmpty()) {
186             return false;
187         }
188     }
189     return true;
190 }
191 
AreWorkersEmpty() const192 bool TaskScheduler::AreWorkersEmpty() const
193 {
194     for (auto *worker : workers_) {
195         if (!worker->IsEmpty()) {
196             return false;
197         }
198     }
199     return true;
200 }
201 
AreNoMoreTasks() const202 bool TaskScheduler::AreNoMoreTasks() const
203 {
204     return GetCountOfTasksInSystem() == 0;
205 }
206 
HelpWorkersWithTasks(TaskProperties properties)207 size_t TaskScheduler::HelpWorkersWithTasks(TaskProperties properties)
208 {
209     // Atomic with acquire order reason: getting correct value
210     if (disableHelpers_.load(std::memory_order_acquire)) {
211         return 0;
212     }
213     size_t executedTasksCount = 0;
214     auto *queue = GetQueue({properties.GetTaskType(), properties.GetVMType()});
215     if (queue->HasTaskWithExecutionMode(properties.GetTaskExecutionMode())) {
216         executedTasksCount = GetAndExecuteSetOfTasksFromQueue(properties);
217     } else if (LIKELY(!workers_.empty())) {
218         executedTasksCount = StealAndExecuteOneTaskFromWorkers(properties);
219     }
220     if (UNLIKELY(executedTasksCount == 0)) {
221         LOG(DEBUG, TASK_MANAGER) << "Helper: got no tasks;";
222         return 0;
223     }
224     LOG(DEBUG, TASK_MANAGER) << "Helper: executed tasks: " << executedTasksCount << ";";
225     DecrementCountOfTasksInSystem(properties, executedTasksCount);
226 
227     // Atomic with acquire order reason: get correct value
228     auto waitToFinish = waitToFinish_.load(std::memory_order_acquire);
229     if (waitToFinish > 0 && GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
230         os::memory::LockHolder taskManagerLockHolder(taskSchedulerStateLock_);
231         finishTasksCondVar_.SignalAll();
232     }
233     return executedTasksCount;
234 }
235 
GetAndExecuteSetOfTasksFromQueue(TaskProperties properties)236 size_t TaskScheduler::GetAndExecuteSetOfTasksFromQueue(TaskProperties properties)
237 {
238     auto *queue = GetQueue({properties.GetTaskType(), properties.GetVMType()});
239     if (queue->IsEmpty()) {
240         return 0;
241     }
242 
243     std::queue<Task> taskQueue;
244     size_t realCount = 0;
245     {
246         os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
247         size_t size = queue->CountOfTasksWithExecutionMode(properties.GetTaskExecutionMode());
248         size_t countToGet = size / (workers_.size() + 1);
249         countToGet = (countToGet >= WorkerThread::WORKER_QUEUE_SIZE) ? WorkerThread::WORKER_QUEUE_SIZE
250                      : (size % (workers_.size() + 1) == 0)           ? countToGet
251                                                                      : countToGet + 1;
252         realCount = queue->PopTasksToHelperThread([&taskQueue](Task &&task) { taskQueue.push(std::move(task)); },
253                                                   countToGet, properties.GetTaskExecutionMode());
254     }
255     while (!taskQueue.empty()) {
256         taskQueue.front().RunTask();
257         taskQueue.pop();
258     }
259     return realCount;
260 }
261 
StealAndExecuteOneTaskFromWorkers(TaskProperties properties)262 size_t TaskScheduler::StealAndExecuteOneTaskFromWorkers(TaskProperties properties)
263 {
264     ASSERT(!workers_.empty());
265     std::queue<Task> taskQueue;
266     auto addTaskToQueue = [&taskQueue](Task &&task) { taskQueue.push(std::move(task)); };
267     while (true) {
268         auto chosenWorker = *std::max_element(
269             workers_.begin(), workers_.end(), [&properties](const WorkerThread *lv, const WorkerThread *rv) {
270                 return lv->CountOfTasksWithProperties(properties) < rv->CountOfTasksWithProperties(properties);
271             });
272         if UNLIKELY (chosenWorker->CountOfTasksWithProperties(properties) == 0) {
273             return 0;
274         }
275 
276         auto stolen = chosenWorker->GiveTasksToAnotherWorker(
277             addTaskToQueue, 1UL, chosenWorker->GetLocalWorkerQueueSchedulerPopId(), properties);
278         if (stolen == 0) {  // check if stealing was successful
279             // if we did not stole we should retry
280             continue;
281         }
282         while (!taskQueue.empty()) {
283             taskQueue.front().RunTask();
284             taskQueue.pop();
285         }
286         return stolen;
287     }
288 }
289 
WaitForFinishAllTasksWithProperties(TaskProperties properties)290 void TaskScheduler::WaitForFinishAllTasksWithProperties(TaskProperties properties)
291 {
292     os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
293     // Atomic with acq_rel order reason: other thread should see correct value
294     waitToFinish_.fetch_add(1, std::memory_order_acq_rel);
295     while (GetCountOfTasksInSystemWithTaskProperties(properties) != 0) {
296         finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
297     }
298     // Atomic with acq_rel order reason: other thread should see correct value
299     waitToFinish_.fetch_sub(1, std::memory_order_acq_rel);
300 }
301 
Finalize()302 void TaskScheduler::Finalize()
303 {
304     ASSERT(start_);
305     {
306         // Wait all tasks will be done
307         os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
308         // Atomic with acq_rel order reason: other thread should
309         // see correct value
310         waitToFinish_.fetch_add(1, std::memory_order_acq_rel);
311         while (!AreNoMoreTasks()) {
312             finishTasksCondVar_.Wait(&taskSchedulerStateLock_);
313         }
314         finish_ = true;
315         // Atomic with release order reason: other thread should see last value
316         disableHelpers_.store(true, std::memory_order_release);
317         // Atomic with acq_rel order reason: other thread should
318         // see correct value
319         waitToFinish_.fetch_sub(1, std::memory_order_acq_rel);
320         queuesWaitCondVar_.SignalAll();
321     }
322     for (auto *worker : workers_) {
323         worker->Join();
324     }
325     for (auto *worker : workers_) {
326         delete worker;
327     }
328 
329     if (IsTaskLifetimeStatisticsUsed()) {
330         for (const auto &line : taskTimeStats_->GetTaskStatistics()) {
331             LOG(INFO, TASK_MANAGER) << line;
332         }
333     }
334     LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: Finalized";
335 }
336 
IncrementCounterOfAddedTasks(TaskProperties properties,size_t ivalue)337 void TaskScheduler::IncrementCounterOfAddedTasks(TaskProperties properties, size_t ivalue)
338 {
339     IncrementCountOfTasksInSystem(properties, ivalue);
340 }
341 
IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap & counterMap)342 size_t TaskScheduler::IncrementCounterOfExecutedTasks(const TaskPropertiesCounterMap &counterMap)
343 {
344     size_t countOfTasks = 0;
345     for (const auto &[properties, count] : counterMap) {
346         countOfTasks += count;
347         DecrementCountOfTasksInSystem(properties, count);
348         // Atomic with acquire order reason: get correct value
349         auto waitToFinish = waitToFinish_.load(std::memory_order_acquire);
350         if (waitToFinish > 0 && GetCountOfTasksInSystemWithTaskProperties(properties) == 0) {
351             os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
352             finishTasksCondVar_.SignalAll();
353         }
354     }
355     return countOfTasks;
356 }
357 
SignalWorkers()358 void TaskScheduler::SignalWorkers()
359 {
360     // Atomic with acquire order reason: get correct value
361     if (waitWorkersCount_.load(std::memory_order_acquire) > 0) {
362         os::memory::LockHolder outsideLockHolder(taskSchedulerStateLock_);
363         queuesWaitCondVar_.Signal();
364     }
365 }
366 
GetQueue(TaskQueueId id) const367 internal::SchedulableTaskQueueInterface *TaskScheduler::GetQueue(TaskQueueId id) const
368 {
369     internal::SchedulableTaskQueueInterface *queue = nullptr;
370     auto taskQueuesIterator = taskQueues_.find(id);
371     if (taskQueuesIterator == taskQueues_.end()) {
372         LOG(FATAL, COMMON) << "Attempt to take a task from a non-existent queue";
373     }
374     std::tie(std::ignore, queue) = *taskQueuesIterator;
375     return queue;
376 }
377 
AddTaskToWaitListWithTimeout(Task && task,uint64_t time)378 WaiterId TaskScheduler::AddTaskToWaitListWithTimeout(Task &&task, uint64_t time)
379 {
380     os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
381     this->IncrementCounterOfAddedTasks(task.GetTaskProperties(), 1U);
382     return waitList_.AddValueToWait(std::move(task), time);
383 }
384 
AddTaskToWaitList(Task && task)385 WaiterId TaskScheduler::AddTaskToWaitList(Task &&task)
386 {
387     // Use adding with max time as possible, wait list will understand that it should set max possible time
388     return AddTaskToWaitListWithTimeout(std::move(task), std::numeric_limits<uint64_t>().max());
389 }
390 
PutWaitTaskInLocalQueue(LocalTaskQueue & queue)391 void TaskScheduler::PutWaitTaskInLocalQueue(LocalTaskQueue &queue)
392 {
393     for (auto task = waitList_.GetReadyValue(); task.has_value(); task = waitList_.GetReadyValue()) {
394         queue.push(std::move(task.value()));
395     }
396 }
397 
PutTaskInTaskQueues(LocalTaskQueue & queue)398 void TaskScheduler::PutTaskInTaskQueues(LocalTaskQueue &queue)
399 {
400     while (!queue.empty()) {
401         Task task = std::move(queue.front());
402         queue.pop();
403         auto prop = task.GetTaskProperties();
404         auto *taskQueue = GetQueue({prop.GetTaskType(), prop.GetVMType()});
405         taskQueue->AddTaskWithoutNewTaskCallbackExecution(std::move(task));
406     }
407 }
408 
SignalWaitList(WaiterId waiterId)409 void TaskScheduler::SignalWaitList(WaiterId waiterId)
410 {
411     std::optional<Task> task;
412     {
413         os::memory::LockHolder lockHolder(taskSchedulerStateLock_);
414         task = waitList_.GetValueById(waiterId);
415     }
416     if (!task.has_value()) {
417         return;
418     }
419     auto prop = task->GetTaskProperties();
420     auto *queue = GetQueue({prop.GetTaskType(), prop.GetVMType()});
421     queue->AddTaskWithoutNewTaskCallbackExecution(std::move(task.value()));
422 }
423 
~TaskScheduler()424 TaskScheduler::~TaskScheduler()
425 {
426     // We can delete TaskScheduler if it wasn't started or it was finished
427     ASSERT(start_ == finish_);
428     // Check if all task queue was deleted
429     ASSERT(taskQueues_.empty());
430     delete taskTimeStats_;
431 }
432 
IncrementCountOfTasksInSystem(TaskProperties prop,size_t count)433 void TaskScheduler::IncrementCountOfTasksInSystem(TaskProperties prop, size_t count)
434 {
435     // Atomic with acq_rel order reason: fast add count to countOfTasksInSystem_[prop]
436     countOfTasksInSystem_[prop].fetch_add(count, std::memory_order_acq_rel);
437 }
438 
DecrementCountOfTasksInSystem(TaskProperties prop,size_t count)439 void TaskScheduler::DecrementCountOfTasksInSystem(TaskProperties prop, size_t count)
440 {
441     // Atomic with acq_rel order reason: fast sub count to countOfTasksInSystem_[prop]
442     countOfTasksInSystem_[prop].fetch_sub(count, std::memory_order_acq_rel);
443 }
444 
GetCountOfTasksInSystemWithTaskProperties(TaskProperties prop) const445 size_t TaskScheduler::GetCountOfTasksInSystemWithTaskProperties(TaskProperties prop) const
446 {
447     // Atomic with acquire order reason: need to sync with all prev fetch_adds and fetch_subs
448     return countOfTasksInSystem_.at(prop).load(std::memory_order_acquire);
449 }
450 
GetCountOfTasksInSystem() const451 size_t TaskScheduler::GetCountOfTasksInSystem() const
452 {
453     size_t sumCount = 0;
454     for ([[maybe_unused]] const auto &[prop, counter] : countOfTasksInSystem_) {
455         // Atomic with acquire order reason: need to sync with all prev fetch_adds and fetch_subs
456         sumCount += counter.load(std::memory_order_acquire);
457     }
458     return sumCount;
459 }
460 
IsTaskLifetimeStatisticsUsed() const461 bool TaskScheduler::IsTaskLifetimeStatisticsUsed() const
462 {
463     return taskTimeStatsType_ != TaskTimeStatsType::NO_STATISTICS;
464 }
465 
GetTaskTimeStats() const466 TaskTimeStatsBase *TaskScheduler::GetTaskTimeStats() const
467 {
468     return taskTimeStats_;
469 }
470 
471 }  // namespace ark::taskmanager
472