1 /*
2 * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "libpandabase/taskmanager/task_scheduler.h"
17 #include "libpandabase/taskmanager/task.h"
18 #include "libpandabase/utils/logger.h"
19 #include "libpandabase/os/thread.h"
20
21 namespace ark::taskmanager {
22
WorkerThread(const std::string & name)23 WorkerThread::WorkerThread(const std::string &name) : scheduler_(TaskScheduler::GetTaskScheduler()), name_(name)
24 {
25 perWorkerPopId_[this] = localQueue_.RegisterConsumer();
26 schedulerPopId_ = localQueue_.RegisterConsumer();
27 thread_ = new std::thread(&WorkerThread::WorkerLoop, this);
28 [[maybe_unused]] auto setNameCore = os::thread::SetThreadName(thread_->native_handle(), name.c_str());
29 ASSERT(setNameCore == 0);
30 }
31
AddTask(Task && task)32 void WorkerThread::AddTask(Task &&task)
33 {
34 localQueue_.Push(std::move(task));
35 }
36
Join()37 void WorkerThread::Join()
38 {
39 thread_->join();
40 }
41
IsEmpty() const42 bool WorkerThread::IsEmpty() const
43 {
44 return localQueue_.IsEmpty();
45 }
46
Size() const47 size_t WorkerThread::Size() const
48 {
49 return localQueue_.Size();
50 }
51
CountOfTasksWithProperties(TaskProperties properties) const52 size_t WorkerThread::CountOfTasksWithProperties(TaskProperties properties) const
53 {
54 return localQueue_.CountOfTasksWithProperties(properties);
55 }
56
WorkerLoop()57 void WorkerThread::WorkerLoop()
58 {
59 auto *scheduler = TaskScheduler::GetTaskScheduler();
60 ASSERT(scheduler != nullptr);
61 if (scheduler->IsTaskLifetimeStatisticsUsed()) {
62 TaskScheduler::GetTaskScheduler()->GetTaskTimeStats()->RegisterWorkerThread();
63 }
64 WaitForStart();
65 auto finishCond = false;
66 while (!finishCond) {
67 ASSERT(finishedTasksCounterMap_.empty());
68 // Worker will steal tasks only if all queues are empty and it's possible to find worker for stealing
69 if (UNLIKELY(scheduler_->AreQueuesEmpty() && !scheduler_->AreWorkersEmpty())) {
70 scheduler_->StealTaskFromOtherWorker(this);
71 if (stolenTask_.IsInvalid()) {
72 continue;
73 }
74 ExecuteStolenTask();
75 } else { // Else it will try get/wait tasks.
76 finishCond = scheduler_->FillWithTasks(this);
77 }
78 ExecuteTasksFromLocalQueue();
79 [[maybe_unused]] size_t countOfTasks = scheduler_->IncrementCounterOfExecutedTasks(finishedTasksCounterMap_);
80 LOG(DEBUG, TASK_MANAGER) << GetWorkerName() << ": executed tasks: " << countOfTasks;
81 countOfExecutedTask_ += countOfTasks;
82 finishedTasksCounterMap_.clear();
83 }
84 LOG(DEBUG, TASK_MANAGER) << GetWorkerName() << " have executed tasks at all: " << countOfExecutedTask_;
85 }
86
ExecuteTasksFromLocalQueue()87 size_t WorkerThread::ExecuteTasksFromLocalQueue()
88 {
89 // Start popping task from local queue and executing them
90 size_t executeTasksCount = 0UL;
91 for (; !localQueue_.IsEmpty(); executeTasksCount++) {
92 auto task = localQueue_.Pop(perWorkerPopId_[this]);
93 // If pop task returned nullopt need to finish execution
94 if (UNLIKELY(!task.has_value())) {
95 break;
96 }
97 task->RunTask();
98 finishedTasksCounterMap_[task->GetTaskProperties()]++;
99 }
100 return executeTasksCount;
101 }
102
ExecuteStolenTask()103 void WorkerThread::ExecuteStolenTask()
104 {
105 ExecuteTask(&stolenTask_);
106 }
107
ExecuteTask(Task * task)108 void WorkerThread::ExecuteTask(Task *task)
109 {
110 ASSERT(task != nullptr);
111 auto prop = task->GetTaskProperties();
112 task->RunTask();
113 task->MakeInvalid();
114 finishedTasksCounterMap_[prop]++;
115 }
116
Start()117 void WorkerThread::Start()
118 {
119 os::memory::LockHolder<os::memory::Mutex> lockHolder(startWaitLock_);
120 start_ = true;
121 startWaitCondVar_.SignalAll();
122 }
123
WaitForStart()124 void WorkerThread::WaitForStart()
125 {
126 os::memory::LockHolder<os::memory::Mutex> lockHolder(startWaitLock_);
127 while (!start_) {
128 startWaitCondVar_.Wait(&startWaitLock_);
129 }
130 }
131
RegisterAllWorkersInLocalQueue(const std::vector<WorkerThread * > & workers)132 void WorkerThread::RegisterAllWorkersInLocalQueue(const std::vector<WorkerThread *> &workers)
133 {
134 for (auto *worker : workers) {
135 if (worker == this) {
136 continue;
137 }
138 perWorkerPopId_[worker] = localQueue_.RegisterConsumer();
139 }
140 }
141
GetWorkerName() const142 std::string WorkerThread::GetWorkerName() const
143 {
144 return name_;
145 }
146
GetLocalWorkerQueuePopId(WorkerThread * worker) const147 size_t WorkerThread::GetLocalWorkerQueuePopId(WorkerThread *worker) const
148 {
149 return perWorkerPopId_.at(worker);
150 }
151
GetLocalWorkerQueueSchedulerPopId() const152 size_t WorkerThread::GetLocalWorkerQueueSchedulerPopId() const
153 {
154 return schedulerPopId_;
155 }
156
SetStolenTask(Task && stolenTask)157 void WorkerThread::SetStolenTask(Task &&stolenTask)
158 {
159 ASSERT(stolenTask_.IsInvalid());
160 stolenTask_ = std::move(stolenTask);
161 }
162
TryDeleteRetiredPtrs()163 void WorkerThread::TryDeleteRetiredPtrs()
164 {
165 localQueue_.TryDeleteRetiredPtrs();
166 }
167
~WorkerThread()168 WorkerThread::~WorkerThread()
169 {
170 delete thread_;
171 }
172
173 } // namespace ark::taskmanager
174