1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <cstring>
17 #include <sys/stat.h>
18 #include "qos.h"
19 #include "dfx/perf/ffrt_perf.h"
20 #include "dfx/trace_record/ffrt_trace_record.h"
21 #include "eu/cpu_monitor.h"
22 #include "eu/cpu_manager_strategy.h"
23 #include "sched/scheduler.h"
24 #include "sched/workgroup_internal.h"
25 #include "eu/qos_interface.h"
26 #include "eu/cpuworker_manager.h"
27 #include "util/ffrt_facade.h"
28 #ifdef FFRT_WORKER_MONITOR
29 #include "util/ffrt_facade.h"
30 #endif
31 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
32 #include "eu/blockaware.h"
33 #endif
34
35 namespace {
InsertTask(void * task,int qos)36 void InsertTask(void* task, int qos)
37 {
38 ffrt_executor_task_t* executorTask = reinterpret_cast<ffrt_executor_task_t*>(task);
39 ffrt::LinkedList* node = reinterpret_cast<ffrt::LinkedList*>(&executorTask->wq);
40 if (!ffrt::FFRTFacade::GetSchedInstance()->InsertNode(node, qos)) {
41 FFRT_LOGE("Insert task failed.");
42 }
43 }
44 }
45
46 namespace ffrt {
IncWorker(const QoS & qos)47 bool CPUWorkerManager::IncWorker(const QoS& qos)
48 {
49 QoS localQos = qos;
50 int workerQos = localQos();
51 if (workerQos < 0 || workerQos >= QoS::MaxNum()) {
52 FFRT_LOGE("IncWorker qos:%d is invaild", workerQos);
53 return false;
54 }
55 std::unique_lock<std::shared_mutex> lock(groupCtl[workerQos].tgMutex);
56 if (tearDown) {
57 FFRT_LOGE("CPU Worker Manager exit");
58 return false;
59 }
60
61 auto worker = CPUManagerStrategy::CreateCPUWorker(localQos, this);
62 auto uniqueWorker = std::unique_ptr<WorkerThread>(worker);
63 if (uniqueWorker == nullptr || uniqueWorker->Exited()) {
64 FFRT_LOGE("IncWorker failed: worker is nullptr or has exited\n");
65 return false;
66 }
67 uniqueWorker->WorkerSetup(worker);
68 auto result = groupCtl[workerQos].threads.emplace(worker, std::move(uniqueWorker));
69 if (!result.second) {
70 FFRT_LOGE("qos:%d worker insert fail:%d", workerQos, result.second);
71 return false;
72 }
73 FFRT_PERF_WORKER_WAKE(workerQos);
74 lock.unlock();
75 #ifdef FFRT_WORKER_MONITOR
76 FFRTFacade::GetWMInstance().SubmitTask();
77 #endif
78 FFRTTraceRecord::UseFfrt();
79 FFRT_LOGI("qos[%d]", workerQos);
80 return true;
81 }
82
GetTaskCount(const QoS & qos)83 int CPUWorkerManager::GetTaskCount(const QoS& qos)
84 {
85 auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(qos);
86 return sched.RQSize();
87 }
88
GetWorkerCount(const QoS & qos)89 int CPUWorkerManager::GetWorkerCount(const QoS& qos)
90 {
91 std::shared_lock<std::shared_mutex> lck(groupCtl[qos()].tgMutex);
92 return groupCtl[qos()].threads.size();
93 }
94
95 // pick task from global queue (per qos)
PickUpTaskFromGlobalQueue(WorkerThread * thread)96 CPUEUTask* CPUWorkerManager::PickUpTaskFromGlobalQueue(WorkerThread* thread)
97 {
98 if (tearDown) {
99 return nullptr;
100 }
101
102 auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
103 auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
104 std::lock_guard lg(*lock);
105 return sched.PickNextTask();
106 }
107
108 // pick task from local queue (per worker)
PickUpTaskFromLocalQueue(WorkerThread * thread)109 CPUEUTask* CPUWorkerManager::PickUpTaskFromLocalQueue(WorkerThread* thread)
110 {
111 if (tearDown) {
112 return nullptr;
113 }
114
115 CPUWorker* worker = reinterpret_cast<CPUWorker*>(thread);
116 void* task = worker->localFifo.PopHead();
117 return reinterpret_cast<CPUEUTask*>(task);
118 }
119
PickUpTaskBatch(WorkerThread * thread)120 CPUEUTask* CPUWorkerManager::PickUpTaskBatch(WorkerThread* thread)
121 {
122 if (tearDown) {
123 return nullptr;
124 }
125
126 auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
127 auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
128 std::lock_guard lg(*lock);
129 CPUEUTask* task = sched.PickNextTask();
130 if (task == nullptr) {
131 return nullptr;
132 }
133
134 int wakedWorkerNum = monitor->WakedWorkerNum(thread->GetQos());
135 // when there is only one worker, the global queue is equivalent to the local queue
136 // prevents local queue tasks that cannot be executed due to blocking tasks
137 if (wakedWorkerNum <= 1) {
138 return task;
139 }
140
141 SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(thread)->localFifo);
142 int expectedTask = GetTaskCount(thread->GetQos()) / wakedWorkerNum - 1;
143 for (int i = 0; i < expectedTask; i++) {
144 if (queue->GetLength() == queue->GetCapacity()) {
145 return task;
146 }
147
148 CPUEUTask* task2local = sched.PickNextTask();
149 if (task2local == nullptr) {
150 return task;
151 }
152
153 queue->PushTail(task2local);
154 }
155
156 return task;
157 }
158
StealTaskBatch(WorkerThread * thread)159 unsigned int CPUWorkerManager::StealTaskBatch(WorkerThread* thread)
160 {
161 if (tearDown) {
162 return 0;
163 }
164
165 if (GetStealingWorkers(thread->GetQos()) > groupCtl[thread->GetQos()].threads.size() / 2) {
166 return 0;
167 }
168
169 std::shared_lock<std::shared_mutex> lck(groupCtl[thread->GetQos()].tgMutex);
170 AddStealingWorker(thread->GetQos());
171 std::unordered_map<WorkerThread*, std::unique_ptr<WorkerThread>>::iterator iter =
172 groupCtl[thread->GetQos()].threads.begin();
173 while (iter != groupCtl[thread->GetQos()].threads.end()) {
174 SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(iter->first)->localFifo);
175 unsigned int queueLen = queue->GetLength();
176 if (iter->first != thread && queueLen > 0) {
177 unsigned int popLen = queue->PopHeadToAnotherQueue(
178 reinterpret_cast<CPUWorker*>(thread)->localFifo, (queueLen + 1) / 2, thread->GetQos(), InsertTask);
179 SubStealingWorker(thread->GetQos());
180 return popLen;
181 }
182 iter++;
183 }
184 SubStealingWorker(thread->GetQos());
185 return 0;
186 }
187
TryPoll(const WorkerThread * thread,int timeout)188 PollerRet CPUWorkerManager::TryPoll(const WorkerThread* thread, int timeout)
189 {
190 if (tearDown || FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap()) {
191 return PollerRet::RET_NULL;
192 }
193 auto& pollerMtx = pollersMtx[thread->GetQos()];
194 if (pollerMtx.try_lock()) {
195 polling_[thread->GetQos()] = 1;
196 if (timeout == -1) {
197 monitor->IntoPollWait(thread->GetQos());
198 }
199 PollerRet ret = FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).PollOnce(timeout);
200 if (timeout == -1) {
201 monitor->OutOfPollWait(thread->GetQos());
202 }
203 polling_[thread->GetQos()] = 0;
204 pollerMtx.unlock();
205 return ret;
206 }
207 return PollerRet::RET_NULL;
208 }
209
NotifyLocalTaskAdded(const QoS & qos)210 void CPUWorkerManager::NotifyLocalTaskAdded(const QoS& qos)
211 {
212 if (stealWorkers[qos()].load(std::memory_order_relaxed) == 0) {
213 monitor->Notify(qos, TaskNotifyType::TASK_LOCAL);
214 }
215 }
216
NotifyTaskPicked(const WorkerThread * thread)217 void CPUWorkerManager::NotifyTaskPicked(const WorkerThread* thread)
218 {
219 monitor->Notify(thread->GetQos(), TaskNotifyType::TASK_PICKED);
220 }
221
WorkerRetired(WorkerThread * thread)222 void CPUWorkerManager::WorkerRetired(WorkerThread* thread)
223 {
224 pid_t pid = thread->Id();
225 int qos = static_cast<int>(thread->GetQos());
226
227 {
228 std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
229 thread->SetExited(true);
230 thread->Detach();
231 auto worker = std::move(groupCtl[qos].threads[thread]);
232 int ret = groupCtl[qos].threads.erase(thread);
233 if (ret != 1) {
234 FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
235 }
236 WorkerLeaveTg(qos, pid);
237 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
238 if (IsBlockAwareInit()) {
239 ret = BlockawareUnregister();
240 if (ret != 0) {
241 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
242 }
243 }
244 #endif
245 worker = nullptr;
246 }
247 }
248
NotifyTaskAdded(const QoS & qos)249 void CPUWorkerManager::NotifyTaskAdded(const QoS& qos)
250 {
251 monitor->Notify(qos, TaskNotifyType::TASK_ADDED);
252 }
253
NotifyWorkers(const QoS & qos,int number)254 void CPUWorkerManager::NotifyWorkers(const QoS& qos, int number)
255 {
256 monitor->NotifyWorkers(qos, number);
257 }
258
CPUWorkerManager()259 CPUWorkerManager::CPUWorkerManager()
260 {
261 groupCtl[qos_deadline_request].tg = std::make_unique<ThreadGroup>();
262 }
263
WorkerJoinTg(const QoS & qos,pid_t pid)264 void CPUWorkerManager::WorkerJoinTg(const QoS& qos, pid_t pid)
265 {
266 std::shared_lock<std::shared_mutex> lock(groupCtl[qos()].tgMutex);
267 if (qos == qos_user_interactive) {
268 (void)JoinWG(pid);
269 return;
270 }
271 auto& tgwrap = groupCtl[qos()];
272 if (!tgwrap.tg) {
273 return;
274 }
275
276 if ((tgwrap.tgRefCount) == 0) {
277 return;
278 }
279
280 tgwrap.tg->Join(pid);
281 }
282
WorkerLeaveTg(const QoS & qos,pid_t pid)283 void CPUWorkerManager::WorkerLeaveTg(const QoS& qos, pid_t pid)
284 {
285 if (qos == qos_user_interactive) {
286 (void)LeaveWG(pid);
287 return;
288 }
289 auto& tgwrap = groupCtl[qos()];
290 if (!tgwrap.tg) {
291 return;
292 }
293
294 if ((tgwrap.tgRefCount) == 0) {
295 return;
296 }
297
298 tgwrap.tg->Leave(pid);
299 }
300
301 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
IsExceedRunningThreshold(const WorkerThread * thread)302 bool CPUWorkerManager::IsExceedRunningThreshold(const WorkerThread* thread)
303 {
304 return monitor->IsExceedRunningThreshold(thread->GetQos());
305 }
306
IsBlockAwareInit()307 bool CPUWorkerManager::IsBlockAwareInit()
308 {
309 return monitor->IsBlockAwareInit();
310 }
311 #endif
312 } // namespace ffrt
313