• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <cstring>
17 #include <sys/stat.h>
18 #include "qos.h"
19 #include "dfx/perf/ffrt_perf.h"
20 #include "dfx/trace_record/ffrt_trace_record.h"
21 #include "eu/cpu_monitor.h"
22 #include "eu/cpu_manager_strategy.h"
23 #include "sched/scheduler.h"
24 #include "sched/workgroup_internal.h"
25 #include "eu/qos_interface.h"
26 #include "eu/cpuworker_manager.h"
27 #include "util/ffrt_facade.h"
28 #ifdef FFRT_WORKER_MONITOR
29 #include "util/ffrt_facade.h"
30 #endif
31 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
32 #include "eu/blockaware.h"
33 #endif
34 
35 namespace {
InsertTask(void * task,int qos)36 void InsertTask(void* task, int qos)
37 {
38     ffrt_executor_task_t* executorTask = reinterpret_cast<ffrt_executor_task_t*>(task);
39     ffrt::LinkedList* node = reinterpret_cast<ffrt::LinkedList*>(&executorTask->wq);
40     if (!ffrt::FFRTFacade::GetSchedInstance()->InsertNode(node, qos)) {
41         FFRT_LOGE("Insert task failed.");
42     }
43 }
44 }
45 
46 namespace ffrt {
IncWorker(const QoS & qos)47 bool CPUWorkerManager::IncWorker(const QoS& qos)
48 {
49     QoS localQos = qos;
50     int workerQos = localQos();
51     if (workerQos < 0 || workerQos >= QoS::MaxNum()) {
52         FFRT_LOGE("IncWorker qos:%d is invaild", workerQos);
53         return false;
54     }
55     std::unique_lock<std::shared_mutex> lock(groupCtl[workerQos].tgMutex);
56     if (tearDown) {
57         FFRT_LOGE("CPU Worker Manager exit");
58         return false;
59     }
60 
61     workerNum.fetch_add(1);
62     auto worker = CPUManagerStrategy::CreateCPUWorker(localQos, this);
63     auto uniqueWorker = std::unique_ptr<WorkerThread>(worker);
64     if (uniqueWorker == nullptr || uniqueWorker->Exited()) {
65         workerNum.fetch_sub(1);
66         FFRT_LOGE("IncWorker failed: worker is nullptr or has exited\n");
67         return false;
68     }
69     uniqueWorker->WorkerSetup(worker);
70     auto result = groupCtl[workerQos].threads.emplace(worker, std::move(uniqueWorker));
71     if (!result.second) {
72         FFRT_LOGE("qos:%d worker insert fail:%d", workerQos, result.second);
73         return false;
74     }
75     lock.unlock();
76 #ifdef FFRT_WORKER_MONITOR
77     FFRTFacade::GetWMInstance().SubmitTask();
78 #endif
79     FFRTTraceRecord::UseFfrt();
80     return true;
81 }
82 
GetTaskCount(const QoS & qos)83 int CPUWorkerManager::GetTaskCount(const QoS& qos)
84 {
85     auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(qos);
86     return sched.RQSize();
87 }
88 
GetWorkerCount(const QoS & qos)89 int CPUWorkerManager::GetWorkerCount(const QoS& qos)
90 {
91     std::shared_lock<std::shared_mutex> lck(groupCtl[qos()].tgMutex);
92     return groupCtl[qos()].threads.size();
93 }
94 
StealTaskBatch(WorkerThread * thread)95 unsigned int CPUWorkerManager::StealTaskBatch(WorkerThread* thread)
96 {
97     if (tearDown) {
98         return 0;
99     }
100 
101     std::shared_lock<std::shared_mutex> lck(groupCtl[thread->GetQos()].tgMutex);
102     if (GetStealingWorkers(thread->GetQos()) > groupCtl[thread->GetQos()].threads.size() / 2) {
103         return 0;
104     }
105 
106     AddStealingWorker(thread->GetQos());
107     std::unordered_map<WorkerThread*, std::unique_ptr<WorkerThread>>::iterator iter =
108         groupCtl[thread->GetQos()].threads.begin();
109     while (iter != groupCtl[thread->GetQos()].threads.end()) {
110         SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(iter->first)->localFifo);
111         unsigned int queueLen = queue->GetLength();
112         if (iter->first != thread && queueLen > 0) {
113             unsigned int popLen = queue->PopHeadToAnotherQueue(
114                 reinterpret_cast<CPUWorker*>(thread)->localFifo, (queueLen + 1) / 2, thread->GetQos(), InsertTask);
115             SubStealingWorker(thread->GetQos());
116             return popLen;
117         }
118         iter++;
119     }
120     SubStealingWorker(thread->GetQos());
121     return 0;
122 }
123 
TryPoll(const WorkerThread * thread,int timeout)124 PollerRet CPUWorkerManager::TryPoll(const WorkerThread* thread, int timeout)
125 {
126     if (tearDown || FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap()) {
127         return PollerRet::RET_NULL;
128     }
129     auto& pollerMtx = pollersMtx[thread->GetQos()];
130     if (pollerMtx.try_lock()) {
131         polling_[thread->GetQos()] = 1;
132         if (timeout == -1) {
133             monitor->IntoPollWait(thread->GetQos());
134         }
135         PollerRet ret = FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).PollOnce(timeout);
136         if (timeout == -1) {
137             monitor->OutOfPollWait(thread->GetQos());
138         }
139         polling_[thread->GetQos()] = 0;
140         pollerMtx.unlock();
141         return ret;
142     }
143     return PollerRet::RET_NULL;
144 }
145 
NotifyLocalTaskAdded(const QoS & qos)146 void CPUWorkerManager::NotifyLocalTaskAdded(const QoS& qos)
147 {
148     if (stealWorkers[qos()].load(std::memory_order_relaxed) == 0) {
149         monitor->Notify(qos, TaskNotifyType::TASK_LOCAL);
150     }
151 }
152 
NotifyTaskPicked(const WorkerThread * thread)153 void CPUWorkerManager::NotifyTaskPicked(const WorkerThread* thread)
154 {
155     monitor->Notify(thread->GetQos(), TaskNotifyType::TASK_PICKED);
156 }
157 
WorkerRetired(WorkerThread * thread)158 void CPUWorkerManager::WorkerRetired(WorkerThread* thread)
159 {
160     pid_t pid = thread->Id();
161     int qos = static_cast<int>(thread->GetQos());
162 
163     {
164         std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
165         thread->SetExited(true);
166         thread->Detach();
167         auto worker = std::move(groupCtl[qos].threads[thread]);
168         int ret = groupCtl[qos].threads.erase(thread);
169         if (ret != 1) {
170             FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
171         }
172         WorkerLeaveTg(qos, pid);
173 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
174         if (IsBlockAwareInit()) {
175             ret = BlockawareUnregister();
176             if (ret != 0) {
177                 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
178             }
179         }
180 #endif
181         worker = nullptr;
182     }
183     workerNum.fetch_sub(1);
184 }
185 
NotifyTaskAdded(const QoS & qos)186 void CPUWorkerManager::NotifyTaskAdded(const QoS& qos)
187 {
188     monitor->Notify(qos, TaskNotifyType::TASK_ADDED);
189 }
190 
NotifyWorkers(const QoS & qos,int number)191 void CPUWorkerManager::NotifyWorkers(const QoS& qos, int number)
192 {
193     monitor->NotifyWorkers(qos, number);
194 }
195 
CPUWorkerManager()196 CPUWorkerManager::CPUWorkerManager()
197 {
198     groupCtl[qos_deadline_request].tg = std::make_unique<ThreadGroup>();
199 }
200 
WorkerJoinTg(const QoS & qos,pid_t pid)201 void CPUWorkerManager::WorkerJoinTg(const QoS& qos, pid_t pid)
202 {
203     std::shared_lock<std::shared_mutex> lock(groupCtl[qos()].tgMutex);
204     if (qos == qos_user_interactive) {
205         (void)JoinWG(pid);
206         return;
207     }
208     auto& tgwrap = groupCtl[qos()];
209     if (!tgwrap.tg) {
210         return;
211     }
212 
213     if ((tgwrap.tgRefCount) == 0) {
214         return;
215     }
216 
217     tgwrap.tg->Join(pid);
218 }
219 
WorkerLeaveTg(const QoS & qos,pid_t pid)220 void CPUWorkerManager::WorkerLeaveTg(const QoS& qos, pid_t pid)
221 {
222     if (qos == qos_user_interactive) {
223         (void)LeaveWG(pid);
224         return;
225     }
226     auto& tgwrap = groupCtl[qos()];
227     if (!tgwrap.tg) {
228         return;
229     }
230 
231     if ((tgwrap.tgRefCount) == 0) {
232         return;
233     }
234 
235     tgwrap.tg->Leave(pid);
236 }
237 
238 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
IsExceedRunningThreshold(const WorkerThread * thread)239 bool CPUWorkerManager::IsExceedRunningThreshold(const WorkerThread* thread)
240 {
241     return monitor->IsExceedRunningThreshold(thread->GetQos());
242 }
243 
IsBlockAwareInit()244 bool CPUWorkerManager::IsBlockAwareInit()
245 {
246     return monitor->IsBlockAwareInit();
247 }
248 #endif
249 } // namespace ffrt
250