1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <cstring>
17 #include <sys/stat.h>
18 #include "qos.h"
19 #include "dfx/perf/ffrt_perf.h"
20 #include "dfx/trace_record/ffrt_trace_record.h"
21 #include "eu/cpu_monitor.h"
22 #include "eu/cpu_manager_strategy.h"
23 #include "sched/scheduler.h"
24 #include "sched/workgroup_internal.h"
25 #include "eu/qos_interface.h"
26 #include "eu/cpuworker_manager.h"
27 #include "util/ffrt_facade.h"
28 #ifdef FFRT_WORKER_MONITOR
29 #include "util/ffrt_facade.h"
30 #endif
31 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
32 #include "eu/blockaware.h"
33 #endif
34
35 namespace {
InsertTask(void * task,int qos)36 void InsertTask(void* task, int qos)
37 {
38 ffrt_executor_task_t* executorTask = reinterpret_cast<ffrt_executor_task_t*>(task);
39 ffrt::LinkedList* node = reinterpret_cast<ffrt::LinkedList*>(&executorTask->wq);
40 if (!ffrt::FFRTFacade::GetSchedInstance()->InsertNode(node, qos)) {
41 FFRT_LOGE("Insert task failed.");
42 }
43 }
44 }
45
46 namespace ffrt {
IncWorker(const QoS & qos)47 bool CPUWorkerManager::IncWorker(const QoS& qos)
48 {
49 QoS localQos = qos;
50 int workerQos = localQos();
51 if (workerQos < 0 || workerQos >= QoS::MaxNum()) {
52 FFRT_LOGE("IncWorker qos:%d is invaild", workerQos);
53 return false;
54 }
55 std::unique_lock<std::shared_mutex> lock(groupCtl[workerQos].tgMutex);
56 if (tearDown) {
57 FFRT_LOGE("CPU Worker Manager exit");
58 return false;
59 }
60
61 workerNum.fetch_add(1);
62 auto worker = CPUManagerStrategy::CreateCPUWorker(localQos, this);
63 auto uniqueWorker = std::unique_ptr<WorkerThread>(worker);
64 if (uniqueWorker == nullptr || uniqueWorker->Exited()) {
65 workerNum.fetch_sub(1);
66 FFRT_LOGE("IncWorker failed: worker is nullptr or has exited\n");
67 return false;
68 }
69 uniqueWorker->WorkerSetup(worker);
70 auto result = groupCtl[workerQos].threads.emplace(worker, std::move(uniqueWorker));
71 if (!result.second) {
72 FFRT_LOGE("qos:%d worker insert fail:%d", workerQos, result.second);
73 return false;
74 }
75 lock.unlock();
76 #ifdef FFRT_WORKER_MONITOR
77 FFRTFacade::GetWMInstance().SubmitTask();
78 #endif
79 FFRTTraceRecord::UseFfrt();
80 return true;
81 }
82
GetTaskCount(const QoS & qos)83 int CPUWorkerManager::GetTaskCount(const QoS& qos)
84 {
85 auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(qos);
86 return sched.RQSize();
87 }
88
GetWorkerCount(const QoS & qos)89 int CPUWorkerManager::GetWorkerCount(const QoS& qos)
90 {
91 std::shared_lock<std::shared_mutex> lck(groupCtl[qos()].tgMutex);
92 return groupCtl[qos()].threads.size();
93 }
94
StealTaskBatch(WorkerThread * thread)95 unsigned int CPUWorkerManager::StealTaskBatch(WorkerThread* thread)
96 {
97 if (tearDown) {
98 return 0;
99 }
100
101 std::shared_lock<std::shared_mutex> lck(groupCtl[thread->GetQos()].tgMutex);
102 if (GetStealingWorkers(thread->GetQos()) > groupCtl[thread->GetQos()].threads.size() / 2) {
103 return 0;
104 }
105
106 AddStealingWorker(thread->GetQos());
107 std::unordered_map<WorkerThread*, std::unique_ptr<WorkerThread>>::iterator iter =
108 groupCtl[thread->GetQos()].threads.begin();
109 while (iter != groupCtl[thread->GetQos()].threads.end()) {
110 SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(iter->first)->localFifo);
111 unsigned int queueLen = queue->GetLength();
112 if (iter->first != thread && queueLen > 0) {
113 unsigned int popLen = queue->PopHeadToAnotherQueue(
114 reinterpret_cast<CPUWorker*>(thread)->localFifo, (queueLen + 1) / 2, thread->GetQos(), InsertTask);
115 SubStealingWorker(thread->GetQos());
116 return popLen;
117 }
118 iter++;
119 }
120 SubStealingWorker(thread->GetQos());
121 return 0;
122 }
123
TryPoll(const WorkerThread * thread,int timeout)124 PollerRet CPUWorkerManager::TryPoll(const WorkerThread* thread, int timeout)
125 {
126 if (tearDown || FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap()) {
127 return PollerRet::RET_NULL;
128 }
129 auto& pollerMtx = pollersMtx[thread->GetQos()];
130 if (pollerMtx.try_lock()) {
131 polling_[thread->GetQos()] = 1;
132 if (timeout == -1) {
133 monitor->IntoPollWait(thread->GetQos());
134 }
135 PollerRet ret = FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).PollOnce(timeout);
136 if (timeout == -1) {
137 monitor->OutOfPollWait(thread->GetQos());
138 }
139 polling_[thread->GetQos()] = 0;
140 pollerMtx.unlock();
141 return ret;
142 }
143 return PollerRet::RET_NULL;
144 }
145
NotifyLocalTaskAdded(const QoS & qos)146 void CPUWorkerManager::NotifyLocalTaskAdded(const QoS& qos)
147 {
148 if (stealWorkers[qos()].load(std::memory_order_relaxed) == 0) {
149 monitor->Notify(qos, TaskNotifyType::TASK_LOCAL);
150 }
151 }
152
NotifyTaskPicked(const WorkerThread * thread)153 void CPUWorkerManager::NotifyTaskPicked(const WorkerThread* thread)
154 {
155 monitor->Notify(thread->GetQos(), TaskNotifyType::TASK_PICKED);
156 }
157
WorkerRetired(WorkerThread * thread)158 void CPUWorkerManager::WorkerRetired(WorkerThread* thread)
159 {
160 pid_t pid = thread->Id();
161 int qos = static_cast<int>(thread->GetQos());
162
163 {
164 std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
165 thread->SetExited(true);
166 thread->Detach();
167 auto worker = std::move(groupCtl[qos].threads[thread]);
168 int ret = groupCtl[qos].threads.erase(thread);
169 if (ret != 1) {
170 FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
171 }
172 WorkerLeaveTg(qos, pid);
173 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
174 if (IsBlockAwareInit()) {
175 ret = BlockawareUnregister();
176 if (ret != 0) {
177 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
178 }
179 }
180 #endif
181 worker = nullptr;
182 }
183 workerNum.fetch_sub(1);
184 }
185
NotifyTaskAdded(const QoS & qos)186 void CPUWorkerManager::NotifyTaskAdded(const QoS& qos)
187 {
188 monitor->Notify(qos, TaskNotifyType::TASK_ADDED);
189 }
190
NotifyWorkers(const QoS & qos,int number)191 void CPUWorkerManager::NotifyWorkers(const QoS& qos, int number)
192 {
193 monitor->NotifyWorkers(qos, number);
194 }
195
CPUWorkerManager()196 CPUWorkerManager::CPUWorkerManager()
197 {
198 groupCtl[qos_deadline_request].tg = std::make_unique<ThreadGroup>();
199 }
200
WorkerJoinTg(const QoS & qos,pid_t pid)201 void CPUWorkerManager::WorkerJoinTg(const QoS& qos, pid_t pid)
202 {
203 std::shared_lock<std::shared_mutex> lock(groupCtl[qos()].tgMutex);
204 if (qos == qos_user_interactive) {
205 (void)JoinWG(pid);
206 return;
207 }
208 auto& tgwrap = groupCtl[qos()];
209 if (!tgwrap.tg) {
210 return;
211 }
212
213 if ((tgwrap.tgRefCount) == 0) {
214 return;
215 }
216
217 tgwrap.tg->Join(pid);
218 }
219
WorkerLeaveTg(const QoS & qos,pid_t pid)220 void CPUWorkerManager::WorkerLeaveTg(const QoS& qos, pid_t pid)
221 {
222 if (qos == qos_user_interactive) {
223 (void)LeaveWG(pid);
224 return;
225 }
226 auto& tgwrap = groupCtl[qos()];
227 if (!tgwrap.tg) {
228 return;
229 }
230
231 if ((tgwrap.tgRefCount) == 0) {
232 return;
233 }
234
235 tgwrap.tg->Leave(pid);
236 }
237
238 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
IsExceedRunningThreshold(const WorkerThread * thread)239 bool CPUWorkerManager::IsExceedRunningThreshold(const WorkerThread* thread)
240 {
241 return monitor->IsExceedRunningThreshold(thread->GetQos());
242 }
243
IsBlockAwareInit()244 bool CPUWorkerManager::IsBlockAwareInit()
245 {
246 return monitor->IsBlockAwareInit();
247 }
248 #endif
249 } // namespace ffrt
250