• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <cstring>
17 #include <sys/stat.h>
18 #include "qos.h"
19 #include "dfx/perf/ffrt_perf.h"
20 #include "dfx/trace_record/ffrt_trace_record.h"
21 #include "eu/cpu_monitor.h"
22 #include "eu/cpu_manager_strategy.h"
23 #include "sched/scheduler.h"
24 #include "sched/workgroup_internal.h"
25 #include "eu/qos_interface.h"
26 #include "eu/cpuworker_manager.h"
27 #include "util/ffrt_facade.h"
28 #ifdef FFRT_WORKER_MONITOR
29 #include "util/ffrt_facade.h"
30 #endif
31 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
32 #include "eu/blockaware.h"
33 #endif
34 
35 namespace {
InsertTask(void * task,int qos)36 void InsertTask(void* task, int qos)
37 {
38     ffrt_executor_task_t* executorTask = reinterpret_cast<ffrt_executor_task_t*>(task);
39     ffrt::LinkedList* node = reinterpret_cast<ffrt::LinkedList*>(&executorTask->wq);
40     if (!ffrt::FFRTFacade::GetSchedInstance()->InsertNode(node, qos)) {
41         FFRT_LOGE("Insert task failed.");
42     }
43 }
44 }
45 
46 namespace ffrt {
IncWorker(const QoS & qos)47 bool CPUWorkerManager::IncWorker(const QoS& qos)
48 {
49     QoS localQos = qos;
50     int workerQos = localQos();
51     if (workerQos < 0 || workerQos >= QoS::MaxNum()) {
52         FFRT_LOGE("IncWorker qos:%d is invaild", workerQos);
53         return false;
54     }
55     std::unique_lock<std::shared_mutex> lock(groupCtl[workerQos].tgMutex);
56     if (tearDown) {
57         FFRT_LOGE("CPU Worker Manager exit");
58         return false;
59     }
60 
61     auto worker = CPUManagerStrategy::CreateCPUWorker(localQos, this);
62     auto uniqueWorker = std::unique_ptr<WorkerThread>(worker);
63     if (uniqueWorker == nullptr || uniqueWorker->Exited()) {
64         FFRT_LOGE("IncWorker failed: worker is nullptr or has exited\n");
65         return false;
66     }
67     uniqueWorker->WorkerSetup(worker);
68     auto result = groupCtl[workerQos].threads.emplace(worker, std::move(uniqueWorker));
69     if (!result.second) {
70         FFRT_LOGE("qos:%d worker insert fail:%d", workerQos, result.second);
71         return false;
72     }
73     FFRT_PERF_WORKER_WAKE(workerQos);
74     lock.unlock();
75 #ifdef FFRT_WORKER_MONITOR
76     FFRTFacade::GetWMInstance().SubmitTask();
77 #endif
78     FFRTTraceRecord::UseFfrt();
79     FFRT_LOGI("qos[%d]", workerQos);
80     return true;
81 }
82 
GetTaskCount(const QoS & qos)83 int CPUWorkerManager::GetTaskCount(const QoS& qos)
84 {
85     auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(qos);
86     return sched.RQSize();
87 }
88 
GetWorkerCount(const QoS & qos)89 int CPUWorkerManager::GetWorkerCount(const QoS& qos)
90 {
91     std::shared_lock<std::shared_mutex> lck(groupCtl[qos()].tgMutex);
92     return groupCtl[qos()].threads.size();
93 }
94 
95 // pick task from global queue (per qos)
PickUpTaskFromGlobalQueue(WorkerThread * thread)96 CPUEUTask* CPUWorkerManager::PickUpTaskFromGlobalQueue(WorkerThread* thread)
97 {
98     if (tearDown) {
99         return nullptr;
100     }
101 
102     auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
103     auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
104     std::lock_guard lg(*lock);
105     return sched.PickNextTask();
106 }
107 
108 // pick task from local queue (per worker)
PickUpTaskFromLocalQueue(WorkerThread * thread)109 CPUEUTask* CPUWorkerManager::PickUpTaskFromLocalQueue(WorkerThread* thread)
110 {
111     if (tearDown) {
112         return nullptr;
113     }
114 
115     CPUWorker* worker = reinterpret_cast<CPUWorker*>(thread);
116     void* task = worker->localFifo.PopHead();
117     return reinterpret_cast<CPUEUTask*>(task);
118 }
119 
PickUpTaskBatch(WorkerThread * thread)120 CPUEUTask* CPUWorkerManager::PickUpTaskBatch(WorkerThread* thread)
121 {
122     if (tearDown) {
123         return nullptr;
124     }
125 
126     auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
127     auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
128     std::lock_guard lg(*lock);
129     CPUEUTask* task = sched.PickNextTask();
130     if (task == nullptr) {
131         return nullptr;
132     }
133 
134     int wakedWorkerNum = monitor->WakedWorkerNum(thread->GetQos());
135     // when there is only one worker, the global queue is equivalent to the local queue
136     // prevents local queue tasks that cannot be executed due to blocking tasks
137     if (wakedWorkerNum <= 1) {
138         return task;
139     }
140 
141     SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(thread)->localFifo);
142     int expectedTask = GetTaskCount(thread->GetQos()) / wakedWorkerNum - 1;
143     for (int i = 0; i < expectedTask; i++) {
144         if (queue->GetLength() == queue->GetCapacity()) {
145             return task;
146         }
147 
148         CPUEUTask* task2local = sched.PickNextTask();
149         if (task2local == nullptr) {
150             return task;
151         }
152 
153         queue->PushTail(task2local);
154     }
155 
156     return task;
157 }
158 
StealTaskBatch(WorkerThread * thread)159 unsigned int CPUWorkerManager::StealTaskBatch(WorkerThread* thread)
160 {
161     if (tearDown) {
162         return 0;
163     }
164 
165     if (GetStealingWorkers(thread->GetQos()) > groupCtl[thread->GetQos()].threads.size() / 2) {
166         return 0;
167     }
168 
169     std::shared_lock<std::shared_mutex> lck(groupCtl[thread->GetQos()].tgMutex);
170     AddStealingWorker(thread->GetQos());
171     std::unordered_map<WorkerThread*, std::unique_ptr<WorkerThread>>::iterator iter =
172         groupCtl[thread->GetQos()].threads.begin();
173     while (iter != groupCtl[thread->GetQos()].threads.end()) {
174         SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(iter->first)->localFifo);
175         unsigned int queueLen = queue->GetLength();
176         if (iter->first != thread && queueLen > 0) {
177             unsigned int popLen = queue->PopHeadToAnotherQueue(
178                 reinterpret_cast<CPUWorker*>(thread)->localFifo, (queueLen + 1) / 2, thread->GetQos(), InsertTask);
179             SubStealingWorker(thread->GetQos());
180             return popLen;
181         }
182         iter++;
183     }
184     SubStealingWorker(thread->GetQos());
185     return 0;
186 }
187 
TryPoll(const WorkerThread * thread,int timeout)188 PollerRet CPUWorkerManager::TryPoll(const WorkerThread* thread, int timeout)
189 {
190     if (tearDown || FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap()) {
191         return PollerRet::RET_NULL;
192     }
193     auto& pollerMtx = pollersMtx[thread->GetQos()];
194     if (pollerMtx.try_lock()) {
195         polling_[thread->GetQos()] = 1;
196         if (timeout == -1) {
197             monitor->IntoPollWait(thread->GetQos());
198         }
199         PollerRet ret = FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).PollOnce(timeout);
200         if (timeout == -1) {
201             monitor->OutOfPollWait(thread->GetQos());
202         }
203         polling_[thread->GetQos()] = 0;
204         pollerMtx.unlock();
205         return ret;
206     }
207     return PollerRet::RET_NULL;
208 }
209 
NotifyLocalTaskAdded(const QoS & qos)210 void CPUWorkerManager::NotifyLocalTaskAdded(const QoS& qos)
211 {
212     if (stealWorkers[qos()].load(std::memory_order_relaxed) == 0) {
213         monitor->Notify(qos, TaskNotifyType::TASK_LOCAL);
214     }
215 }
216 
NotifyTaskPicked(const WorkerThread * thread)217 void CPUWorkerManager::NotifyTaskPicked(const WorkerThread* thread)
218 {
219     monitor->Notify(thread->GetQos(), TaskNotifyType::TASK_PICKED);
220 }
221 
WorkerRetired(WorkerThread * thread)222 void CPUWorkerManager::WorkerRetired(WorkerThread* thread)
223 {
224     pid_t pid = thread->Id();
225     int qos = static_cast<int>(thread->GetQos());
226 
227     {
228         std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
229         thread->SetExited(true);
230         thread->Detach();
231         auto worker = std::move(groupCtl[qos].threads[thread]);
232         int ret = groupCtl[qos].threads.erase(thread);
233         if (ret != 1) {
234             FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
235         }
236         WorkerLeaveTg(qos, pid);
237 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
238         if (IsBlockAwareInit()) {
239             ret = BlockawareUnregister();
240             if (ret != 0) {
241                 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
242             }
243         }
244 #endif
245         worker = nullptr;
246     }
247 }
248 
NotifyTaskAdded(const QoS & qos)249 void CPUWorkerManager::NotifyTaskAdded(const QoS& qos)
250 {
251     monitor->Notify(qos, TaskNotifyType::TASK_ADDED);
252 }
253 
NotifyWorkers(const QoS & qos,int number)254 void CPUWorkerManager::NotifyWorkers(const QoS& qos, int number)
255 {
256     monitor->NotifyWorkers(qos, number);
257 }
258 
CPUWorkerManager()259 CPUWorkerManager::CPUWorkerManager()
260 {
261     groupCtl[qos_deadline_request].tg = std::make_unique<ThreadGroup>();
262 }
263 
WorkerJoinTg(const QoS & qos,pid_t pid)264 void CPUWorkerManager::WorkerJoinTg(const QoS& qos, pid_t pid)
265 {
266     std::shared_lock<std::shared_mutex> lock(groupCtl[qos()].tgMutex);
267     if (qos == qos_user_interactive) {
268         (void)JoinWG(pid);
269         return;
270     }
271     auto& tgwrap = groupCtl[qos()];
272     if (!tgwrap.tg) {
273         return;
274     }
275 
276     if ((tgwrap.tgRefCount) == 0) {
277         return;
278     }
279 
280     tgwrap.tg->Join(pid);
281 }
282 
WorkerLeaveTg(const QoS & qos,pid_t pid)283 void CPUWorkerManager::WorkerLeaveTg(const QoS& qos, pid_t pid)
284 {
285     if (qos == qos_user_interactive) {
286         (void)LeaveWG(pid);
287         return;
288     }
289     auto& tgwrap = groupCtl[qos()];
290     if (!tgwrap.tg) {
291         return;
292     }
293 
294     if ((tgwrap.tgRefCount) == 0) {
295         return;
296     }
297 
298     tgwrap.tg->Leave(pid);
299 }
300 
301 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
IsExceedRunningThreshold(const WorkerThread * thread)302 bool CPUWorkerManager::IsExceedRunningThreshold(const WorkerThread* thread)
303 {
304     return monitor->IsExceedRunningThreshold(thread->GetQos());
305 }
306 
IsBlockAwareInit()307 bool CPUWorkerManager::IsBlockAwareInit()
308 {
309     return monitor->IsBlockAwareInit();
310 }
311 #endif
312 } // namespace ffrt
313