• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "libpandabase/taskmanager/task_scheduler.h"
17 #include "libpandabase/taskmanager/task.h"
18 #include "libpandabase/utils/logger.h"
19 #include "libpandabase/os/thread.h"
20 
21 namespace ark::taskmanager {
22 
WorkerThread(const std::string & name)23 WorkerThread::WorkerThread(const std::string &name) : scheduler_(TaskScheduler::GetTaskScheduler()), name_(name)
24 {
25     perWorkerPopId_[this] = localQueue_.RegisterConsumer();
26     schedulerPopId_ = localQueue_.RegisterConsumer();
27     thread_ = new std::thread(&WorkerThread::WorkerLoop, this);
28     [[maybe_unused]] auto setNameCore = os::thread::SetThreadName(thread_->native_handle(), name.c_str());
29     ASSERT(setNameCore == 0);
30 }
31 
AddTask(Task && task)32 void WorkerThread::AddTask(Task &&task)
33 {
34     localQueue_.Push(std::move(task));
35 }
36 
Join()37 void WorkerThread::Join()
38 {
39     thread_->join();
40 }
41 
IsEmpty() const42 bool WorkerThread::IsEmpty() const
43 {
44     return localQueue_.IsEmpty();
45 }
46 
Size() const47 size_t WorkerThread::Size() const
48 {
49     return localQueue_.Size();
50 }
51 
CountOfTasksWithProperties(TaskProperties properties) const52 size_t WorkerThread::CountOfTasksWithProperties(TaskProperties properties) const
53 {
54     return localQueue_.CountOfTasksWithProperties(properties);
55 }
56 
WorkerLoop()57 void WorkerThread::WorkerLoop()
58 {
59     auto *scheduler = TaskScheduler::GetTaskScheduler();
60     ASSERT(scheduler != nullptr);
61     if (scheduler->IsTaskLifetimeStatisticsUsed()) {
62         TaskScheduler::GetTaskScheduler()->GetTaskTimeStats()->RegisterWorkerThread();
63     }
64     WaitForStart();
65     auto finishCond = false;
66     while (!finishCond) {
67         ASSERT(finishedTasksCounterMap_.empty());
68         // Worker will steal tasks only if all queues are empty and it's possible to find worker for stealing
69         if (UNLIKELY(scheduler_->AreQueuesEmpty() && !scheduler_->AreWorkersEmpty())) {
70             scheduler_->StealTaskFromOtherWorker(this);
71             if (stolenTask_.IsInvalid()) {
72                 continue;
73             }
74             ExecuteStolenTask();
75         } else {  // Else it will try get/wait tasks.
76             finishCond = scheduler_->FillWithTasks(this);
77         }
78         ExecuteTasksFromLocalQueue();
79         [[maybe_unused]] size_t countOfTasks = scheduler_->IncrementCounterOfExecutedTasks(finishedTasksCounterMap_);
80         LOG(DEBUG, TASK_MANAGER) << GetWorkerName() << ": executed tasks: " << countOfTasks;
81         countOfExecutedTask_ += countOfTasks;
82         finishedTasksCounterMap_.clear();
83     }
84     LOG(DEBUG, TASK_MANAGER) << GetWorkerName() << " have executed tasks at all: " << countOfExecutedTask_;
85 }
86 
ExecuteTasksFromLocalQueue()87 size_t WorkerThread::ExecuteTasksFromLocalQueue()
88 {
89     // Start popping task from local queue and executing them
90     size_t executeTasksCount = 0UL;
91     for (; !localQueue_.IsEmpty(); executeTasksCount++) {
92         auto task = localQueue_.Pop(perWorkerPopId_[this]);
93         // If pop task returned nullopt need to finish execution
94         if (UNLIKELY(!task.has_value())) {
95             break;
96         }
97         task->RunTask();
98         finishedTasksCounterMap_[task->GetTaskProperties()]++;
99     }
100     return executeTasksCount;
101 }
102 
ExecuteStolenTask()103 void WorkerThread::ExecuteStolenTask()
104 {
105     ExecuteTask(&stolenTask_);
106 }
107 
ExecuteTask(Task * task)108 void WorkerThread::ExecuteTask(Task *task)
109 {
110     ASSERT(task != nullptr);
111     auto prop = task->GetTaskProperties();
112     task->RunTask();
113     task->MakeInvalid();
114     finishedTasksCounterMap_[prop]++;
115 }
116 
Start()117 void WorkerThread::Start()
118 {
119     os::memory::LockHolder<os::memory::Mutex> lockHolder(startWaitLock_);
120     start_ = true;
121     startWaitCondVar_.SignalAll();
122 }
123 
WaitForStart()124 void WorkerThread::WaitForStart()
125 {
126     os::memory::LockHolder<os::memory::Mutex> lockHolder(startWaitLock_);
127     while (!start_) {
128         startWaitCondVar_.Wait(&startWaitLock_);
129     }
130 }
131 
RegisterAllWorkersInLocalQueue(const std::vector<WorkerThread * > & workers)132 void WorkerThread::RegisterAllWorkersInLocalQueue(const std::vector<WorkerThread *> &workers)
133 {
134     for (auto *worker : workers) {
135         if (worker == this) {
136             continue;
137         }
138         perWorkerPopId_[worker] = localQueue_.RegisterConsumer();
139     }
140 }
141 
GetWorkerName() const142 std::string WorkerThread::GetWorkerName() const
143 {
144     return name_;
145 }
146 
GetLocalWorkerQueuePopId(WorkerThread * worker) const147 size_t WorkerThread::GetLocalWorkerQueuePopId(WorkerThread *worker) const
148 {
149     return perWorkerPopId_.at(worker);
150 }
151 
GetLocalWorkerQueueSchedulerPopId() const152 size_t WorkerThread::GetLocalWorkerQueueSchedulerPopId() const
153 {
154     return schedulerPopId_;
155 }
156 
SetStolenTask(Task && stolenTask)157 void WorkerThread::SetStolenTask(Task &&stolenTask)
158 {
159     ASSERT(stolenTask_.IsInvalid());
160     stolenTask_ = std::move(stolenTask);
161 }
162 
TryDeleteRetiredPtrs()163 void WorkerThread::TryDeleteRetiredPtrs()
164 {
165     localQueue_.TryDeleteRetiredPtrs();
166 }
167 
~WorkerThread()168 WorkerThread::~WorkerThread()
169 {
170     delete thread_;
171 }
172 
173 }  // namespace ark::taskmanager
174