• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "libpandabase/taskmanager/task_scheduler.h"
17 #include <atomic>
18 #include "libpandabase/taskmanager/task_queue_set.h"
19 #include "libpandabase/utils/logger.h"
20 #include "libpandabase/os/mutex.h"
21 
22 namespace ark::taskmanager::internal {
23 
TaskScheduler(size_t workersCount,TaskWaitList * waitList,TaskQueueSet * queueSet)24 TaskScheduler::TaskScheduler(size_t workersCount, TaskWaitList *waitList, TaskQueueSet *queueSet)
25     : waitList_(waitList), queueSet_(queueSet)
26 {
27     ASSERT(!start_);
28     start_ = true;
29     LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: creates " << workersCount << " threads";
30     // Starts all workers
31     SetCountOfWorkers(workersCount);
32 }
33 
StealTaskFromOtherWorker(WorkerThread * taskReceiver)34 size_t TaskScheduler::StealTaskFromOtherWorker(WorkerThread *taskReceiver)
35 {
36     // WorkerThread tries to find Worker with the most tasks
37     // CC-OFFNXT(G.CTL.03): false positive
38     while (true) {
39         // When we start stealing worker is empty, so if it will be chosen, we will leave a method with 0
40         WorkerThread *chosenWorker = taskReceiver;
41         size_t taskCount = 0;
42         // First stage: find worker with most count of tasks
43         // Atomic with relaxed order reason: no order required
44         size_t workersCount = workersCount_.load(std::memory_order_relaxed);
45         for (size_t i = 0; i < workersCount; i++) {
46             auto *worker = workers_[i].load(std::memory_order_relaxed);
47             if (worker != nullptr && taskCount < worker->Size()) {
48                 chosenWorker = worker;
49                 taskCount = worker->Size();
50             }
51         }
52         // Check if chosen worker contains anough tasks
53         if (taskCount <= 1U) {
54             return 0U;
55         }
56         // If worker was successfully found, steals task from its local queue
57         TaskPtr task = chosenWorker->PopForegroundTask();
58         if (task != nullptr) {
59             taskReceiver->AddForegroundTask(std::move(task));
60             return 1U;
61         }
62         task = chosenWorker->PopBackgroundTask();
63         if (task != nullptr) {
64             taskReceiver->AddBackgroundTask(std::move(task));
65             return 1U;
66         }
67         // If getting faild with Pop task we should retry
68     }
69 }
70 
FillWithTasks(WorkerThread * worker)71 bool TaskScheduler::FillWithTasks(WorkerThread *worker)
72 {
73     // CC-OFFNXT(G.CTL.03): false positive
74     while (true) {
75         ProcessWaitList();
76         auto queue = queueSet_->SelectQueue();
77         if (queue != nullptr) {
78             PutTasksInWorker(worker, reinterpret_cast<internal::SchedulableTaskQueueInterface *>(queue));
79             return false;
80         }
81 
82         if (GetCountOfTasksInSystem() != 0U && StealTaskFromOtherWorker(worker) != 0) {
83             return false;
84         }
85 
86         if (WaitUntilNewTasks(worker)) {
87             return true;
88         }
89     }
90 }
91 
ProcessWaitList()92 size_t TaskScheduler::ProcessWaitList()
93 {
94     // Atomic with relaxed order reason: no sync depends
95     if (waitListIsProcessing_.exchange(true, std::memory_order_relaxed)) {
96         return 0;
97     }
98     if (!waitList_->HaveReadyValue()) {
99         // Atomic with relaxed order reason: no sync depends
100         waitListIsProcessing_.store(false, std::memory_order_relaxed);
101         return 0;
102     }
103     auto count = waitList_->ProcessWaitList([](TaskWaitListElem &&waitVal) {
104         auto &[task, taskPoster] = waitVal;
105         taskPoster(std::move(task));
106     });
107     // Atomic with relaxed order reason: no sync depends
108     waitListIsProcessing_.store(false, std::memory_order_relaxed);
109     return count;
110 }
111 
WaitUntilNewTasks(WorkerThread * worker)112 bool TaskScheduler::WaitUntilNewTasks(WorkerThread *worker)
113 {
114     os::memory::LockHolder lh(taskSchedulerStateLock_);
115     // Atomic with relaxed order reason: no sync depends
116     waitWorkersCount_.fetch_add(1, std::memory_order_relaxed);
117     // we don't use while loop here, because WorkerThread should be fast, if it was contified waiting work correct, if
118     // it was not we will retuen to the worker, where we should retry all the checks, so behiaviour is correct.
119     if (AreQueuesEmpty() || !waitList_->HaveReadyValue()) {
120         queuesWaitCondVar_.TimedWait(&taskSchedulerStateLock_, TASK_WAIT_TIMEOUT);
121     }
122     // Atomic with relaxed order reason: no sync depends
123     waitWorkersCount_.fetch_sub(1, std::memory_order_relaxed);
124     return worker->CheckFinish();
125 }
126 
PutTasksInWorker(WorkerThread * worker,internal::SchedulableTaskQueueInterface * queue)127 size_t TaskScheduler::PutTasksInWorker(WorkerThread *worker, internal::SchedulableTaskQueueInterface *queue)
128 {
129     auto addForegroundTaskFunc = [worker](TaskPtr &&task) { worker->AddForegroundTask(std::move(task)); };
130     auto addBackgroundTaskFunc = [worker](TaskPtr &&task) { worker->AddBackgroundTask(std::move(task)); };
131 
132     // Now we calc how many task we want to get from queue. If there are few tasks, then we want them to be evenly
133     // distributed among the workers.
134     size_t size = queue->Size();
135     // Atomic with relaxed order reason: no order required
136     size_t workersCount = workersCount_.load(std::memory_order_relaxed);
137     // CC-OFFNXT(G.EXP.22-CPP): workersCount represents conts of execution units, it's 0 only if currect worker doesn't
138     // exist
139     size_t countToGet = size / workersCount + 1;
140     countToGet = (countToGet >= WorkerThread::WORKER_QUEUE_SIZE) ? WorkerThread::WORKER_QUEUE_SIZE : countToGet;
141     // Execute popping task form queue
142     size_t queueTaskCount = queue->PopTasksToWorker(addForegroundTaskFunc, addBackgroundTaskFunc, countToGet);
143     LOG(DEBUG, TASK_MANAGER) << worker->GetWorkerName() << ": get tasks " << queueTaskCount << "; ";
144     return queueTaskCount;
145 }
146 
AreQueuesEmpty() const147 bool TaskScheduler::AreQueuesEmpty() const
148 {
149     return queueSet_->AreQueuesEmpty();
150 }
151 
AreWorkersEmpty() const152 bool TaskScheduler::AreWorkersEmpty() const
153 {
154     // Atomic with relaxed order reason: no order required
155     size_t workersCount = workersCount_.load(std::memory_order_relaxed);
156     for (size_t i = 0; i < workersCount; i++) {
157         // Atomic with relaxed order reason: no order required
158         auto *worker = workers_[i].load(std::memory_order_relaxed);
159         if (worker != nullptr && !worker->IsEmpty()) {
160             return false;
161         }
162     }
163     return true;
164 }
165 
AreNoMoreTasks() const166 bool TaskScheduler::AreNoMoreTasks() const
167 {
168     return GetCountOfTasksInSystem() == 0;
169 }
170 
SignalWorkers()171 void TaskScheduler::SignalWorkers()
172 {
173     // Atomic with relaxed order reason: no order required
174     if (waitWorkersCount_.load(std::memory_order_relaxed) > 0) {
175         os::memory::LockHolder lh(taskSchedulerStateLock_);
176         queuesWaitCondVar_.Signal();
177     }
178 }
179 
GetCountOfTasksInSystem() const180 size_t TaskScheduler::GetCountOfTasksInSystem() const
181 {
182     return queueSet_->GetCountOfLiveTasks();
183 }
184 
GetCountOfWorkers() const185 size_t TaskScheduler::GetCountOfWorkers() const
186 {
187     os::memory::LockHolder lh(taskSchedulerStateLock_);
188     return workers_.size();
189 }
190 
SetCountOfWorkers(size_t count)191 void TaskScheduler::SetCountOfWorkers(size_t count)
192 {
193     os::memory::LockHolder lh(taskSchedulerStateLock_);
194     // Atomic with relaxed order reason: no order required
195     size_t currentCount = workersCount_.load(std::memory_order_relaxed);
196     if (count > currentCount) {
197         // Atomic with relaxed order reason: no order required
198         workersCount_.store(count, std::memory_order_relaxed);
199         for (size_t i = currentCount; i < count; i++) {
200             auto workerName = "TMWorker_" + std::to_string(i + 1UL);
201             // Atomic with relaxed order reason: no order required
202             workers_[i].store(new WorkerThread(this, queueSet_->GetTaskTimeStats(), workerName),
203                               std::memory_order_relaxed);
204             LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: created thread with name " << workerName;
205         }
206         return;
207     }
208     if (count < currentCount) {
209         std::vector<WorkerThread *> workersToWait;
210         for (size_t i = count; i != currentCount; i++) {
211             // Atomic with relaxed order reason: no order required
212             auto *worker = workers_[i].load(std::memory_order_relaxed);
213             worker->SetFinish();
214             workersToWait.push_back(worker);
215         }
216         queuesWaitCondVar_.SignalAll();
217         taskSchedulerStateLock_.Unlock();
218         for (auto *worker : workersToWait) {
219             worker->Join();
220         }
221         taskSchedulerStateLock_.Lock();
222         for (auto *worker : workersToWait) {
223             workersToDelete_.push_back(worker);
224         }
225         // Atomic with relaxed order reason: no order required
226         workersCount_.store(count, std::memory_order_relaxed);
227         return;
228     }
229 }
230 
~TaskScheduler()231 TaskScheduler::~TaskScheduler()
232 {
233     ASSERT(start_);
234     ASSERT(AreNoMoreTasks());
235     SetCountOfWorkers(0);
236     for (auto *worker : workersToDelete_) {
237         delete worker;
238     }
239     LOG(DEBUG, TASK_MANAGER) << "TaskScheduler: Finalized";
240 }
241 
242 }  // namespace ark::taskmanager::internal
243