1 /*
2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "libpandabase/taskmanager/task_scheduler.h"
17 #include <atomic>
18 #include "libpandabase/taskmanager/task_queue_set.h"
19 #include "libpandabase/utils/logger.h"
20 #include "libpandabase/os/mutex.h"
21
22 namespace ark::taskmanager::internal {
23
TaskScheduler(size_t workersCount,TaskWaitList * waitList,TaskQueueSet * queueSet)24 TaskScheduler::TaskScheduler(size_t workersCount, TaskWaitList *waitList, TaskQueueSet *queueSet)
25 : waitList_(waitList), queueSet_(queueSet)
26 {
27 ASSERT(!start_);
28 start_ = true;
29 LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: creates " << workersCount << " threads";
30 // Starts all workers
31 SetCountOfWorkers(workersCount);
32 }
33
StealTaskFromOtherWorker(WorkerThread * taskReceiver)34 size_t TaskScheduler::StealTaskFromOtherWorker(WorkerThread *taskReceiver)
35 {
36 // WorkerThread tries to find Worker with the most tasks
37 // CC-OFFNXT(G.CTL.03): false positive
38 while (true) {
39 // When we start stealing worker is empty, so if it will be chosen, we will leave a method with 0
40 WorkerThread *chosenWorker = taskReceiver;
41 size_t taskCount = 0;
42 // First stage: find worker with most count of tasks
43 // Atomic with relaxed order reason: no order required
44 size_t workersCount = workersCount_.load(std::memory_order_relaxed);
45 for (size_t i = 0; i < workersCount; i++) {
46 auto *worker = workers_[i].load(std::memory_order_relaxed);
47 if (worker != nullptr && taskCount < worker->Size()) {
48 chosenWorker = worker;
49 taskCount = worker->Size();
50 }
51 }
52 // Check if chosen worker contains anough tasks
53 if (taskCount <= 1U) {
54 return 0U;
55 }
56 // If worker was successfully found, steals task from its local queue
57 TaskPtr task = chosenWorker->PopForegroundTask();
58 if (task != nullptr) {
59 taskReceiver->AddForegroundTask(std::move(task));
60 return 1U;
61 }
62 task = chosenWorker->PopBackgroundTask();
63 if (task != nullptr) {
64 taskReceiver->AddBackgroundTask(std::move(task));
65 return 1U;
66 }
67 // If getting faild with Pop task we should retry
68 }
69 }
70
FillWithTasks(WorkerThread * worker)71 bool TaskScheduler::FillWithTasks(WorkerThread *worker)
72 {
73 // CC-OFFNXT(G.CTL.03): false positive
74 while (true) {
75 ProcessWaitList();
76 auto queue = queueSet_->SelectQueue();
77 if (queue != nullptr) {
78 PutTasksInWorker(worker, reinterpret_cast<internal::SchedulableTaskQueueInterface *>(queue));
79 return false;
80 }
81
82 if (GetCountOfTasksInSystem() != 0U && StealTaskFromOtherWorker(worker) != 0) {
83 return false;
84 }
85
86 if (WaitUntilNewTasks(worker)) {
87 return true;
88 }
89 }
90 }
91
ProcessWaitList()92 size_t TaskScheduler::ProcessWaitList()
93 {
94 // Atomic with relaxed order reason: no sync depends
95 if (waitListIsProcessing_.exchange(true, std::memory_order_relaxed)) {
96 return 0;
97 }
98 if (!waitList_->HaveReadyValue()) {
99 // Atomic with relaxed order reason: no sync depends
100 waitListIsProcessing_.store(false, std::memory_order_relaxed);
101 return 0;
102 }
103 auto count = waitList_->ProcessWaitList([](TaskWaitListElem &&waitVal) {
104 auto &[task, taskPoster] = waitVal;
105 taskPoster(std::move(task));
106 });
107 // Atomic with relaxed order reason: no sync depends
108 waitListIsProcessing_.store(false, std::memory_order_relaxed);
109 return count;
110 }
111
WaitUntilNewTasks(WorkerThread * worker)112 bool TaskScheduler::WaitUntilNewTasks(WorkerThread *worker)
113 {
114 os::memory::LockHolder lh(taskSchedulerStateLock_);
115 // Atomic with relaxed order reason: no sync depends
116 waitWorkersCount_.fetch_add(1, std::memory_order_relaxed);
117 // we don't use while loop here, because WorkerThread should be fast, if it was contified waiting work correct, if
118 // it was not we will retuen to the worker, where we should retry all the checks, so behiaviour is correct.
119 if (AreQueuesEmpty() || !waitList_->HaveReadyValue()) {
120 queuesWaitCondVar_.TimedWait(&taskSchedulerStateLock_, TASK_WAIT_TIMEOUT);
121 }
122 // Atomic with relaxed order reason: no sync depends
123 waitWorkersCount_.fetch_sub(1, std::memory_order_relaxed);
124 return worker->CheckFinish();
125 }
126
PutTasksInWorker(WorkerThread * worker,internal::SchedulableTaskQueueInterface * queue)127 size_t TaskScheduler::PutTasksInWorker(WorkerThread *worker, internal::SchedulableTaskQueueInterface *queue)
128 {
129 auto addForegroundTaskFunc = [worker](TaskPtr &&task) { worker->AddForegroundTask(std::move(task)); };
130 auto addBackgroundTaskFunc = [worker](TaskPtr &&task) { worker->AddBackgroundTask(std::move(task)); };
131
132 // Now we calc how many task we want to get from queue. If there are few tasks, then we want them to be evenly
133 // distributed among the workers.
134 size_t size = queue->Size();
135 // Atomic with relaxed order reason: no order required
136 size_t workersCount = workersCount_.load(std::memory_order_relaxed);
137 // CC-OFFNXT(G.EXP.22-CPP): workersCount represents conts of execution units, it's 0 only if currect worker doesn't
138 // exist
139 size_t countToGet = size / workersCount + 1;
140 countToGet = (countToGet >= WorkerThread::WORKER_QUEUE_SIZE) ? WorkerThread::WORKER_QUEUE_SIZE : countToGet;
141 // Execute popping task form queue
142 size_t queueTaskCount = queue->PopTasksToWorker(addForegroundTaskFunc, addBackgroundTaskFunc, countToGet);
143 LOG(DEBUG, TASK_MANAGER) << worker->GetWorkerName() << ": get tasks " << queueTaskCount << "; ";
144 return queueTaskCount;
145 }
146
AreQueuesEmpty() const147 bool TaskScheduler::AreQueuesEmpty() const
148 {
149 return queueSet_->AreQueuesEmpty();
150 }
151
AreWorkersEmpty() const152 bool TaskScheduler::AreWorkersEmpty() const
153 {
154 // Atomic with relaxed order reason: no order required
155 size_t workersCount = workersCount_.load(std::memory_order_relaxed);
156 for (size_t i = 0; i < workersCount; i++) {
157 // Atomic with relaxed order reason: no order required
158 auto *worker = workers_[i].load(std::memory_order_relaxed);
159 if (worker != nullptr && !worker->IsEmpty()) {
160 return false;
161 }
162 }
163 return true;
164 }
165
AreNoMoreTasks() const166 bool TaskScheduler::AreNoMoreTasks() const
167 {
168 return GetCountOfTasksInSystem() == 0;
169 }
170
SignalWorkers()171 void TaskScheduler::SignalWorkers()
172 {
173 // Atomic with relaxed order reason: no order required
174 if (waitWorkersCount_.load(std::memory_order_relaxed) > 0) {
175 os::memory::LockHolder lh(taskSchedulerStateLock_);
176 queuesWaitCondVar_.Signal();
177 }
178 }
179
GetCountOfTasksInSystem() const180 size_t TaskScheduler::GetCountOfTasksInSystem() const
181 {
182 return queueSet_->GetCountOfLiveTasks();
183 }
184
GetCountOfWorkers() const185 size_t TaskScheduler::GetCountOfWorkers() const
186 {
187 os::memory::LockHolder lh(taskSchedulerStateLock_);
188 return workers_.size();
189 }
190
SetCountOfWorkers(size_t count)191 void TaskScheduler::SetCountOfWorkers(size_t count)
192 {
193 os::memory::LockHolder lh(taskSchedulerStateLock_);
194 // Atomic with relaxed order reason: no order required
195 size_t currentCount = workersCount_.load(std::memory_order_relaxed);
196 if (count > currentCount) {
197 // Atomic with relaxed order reason: no order required
198 workersCount_.store(count, std::memory_order_relaxed);
199 for (size_t i = currentCount; i < count; i++) {
200 auto workerName = "TMWorker_" + std::to_string(i + 1UL);
201 // Atomic with relaxed order reason: no order required
202 workers_[i].store(new WorkerThread(this, queueSet_->GetTaskTimeStats(), workerName),
203 std::memory_order_relaxed);
204 LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: created thread with name " << workerName;
205 }
206 return;
207 }
208 if (count < currentCount) {
209 std::vector<WorkerThread *> workersToWait;
210 for (size_t i = count; i != currentCount; i++) {
211 // Atomic with relaxed order reason: no order required
212 auto *worker = workers_[i].load(std::memory_order_relaxed);
213 worker->SetFinish();
214 workersToWait.push_back(worker);
215 }
216 queuesWaitCondVar_.SignalAll();
217 taskSchedulerStateLock_.Unlock();
218 for (auto *worker : workersToWait) {
219 worker->Join();
220 }
221 taskSchedulerStateLock_.Lock();
222 for (auto *worker : workersToWait) {
223 workersToDelete_.push_back(worker);
224 }
225 // Atomic with relaxed order reason: no order required
226 workersCount_.store(count, std::memory_order_relaxed);
227 return;
228 }
229 }
230
~TaskScheduler()231 TaskScheduler::~TaskScheduler()
232 {
233 ASSERT(start_);
234 ASSERT(AreNoMoreTasks());
235 SetCountOfWorkers(0);
236 for (auto *worker : workersToDelete_) {
237 delete worker;
238 }
239 LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: Finalized";
240 }
241
242 } // namespace ark::taskmanager::internal
243