1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <climits>
17 #include <cstring>
18 #include <sys/stat.h>
19 #include "dfx/perf/ffrt_perf.h"
20 #include "eu/co_routine_factory.h"
21 #include "eu/cpu_manager_strategy.h"
22 #include "eu/qos_interface.h"
23 #include "eu/scpu_monitor.h"
24 #include "sched/scheduler.h"
25 #include "sched/workgroup_internal.h"
26 #include "util/ffrt_facade.h"
27 #include "util/slab.h"
28 #include "eu/scpuworker_manager.h"
29 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
30 #include "eu/blockaware.h"
31 #endif
32
33 namespace {
34 #if !defined(IDLE_WORKER_DESTRUCT)
35 constexpr int waiting_seconds = 10;
36 #else
37 constexpr int waiting_seconds = 5;
38 #endif
39
40 const std::map<std::string, void(*)(const ffrt::QoS&, void*, ffrt::TaskNotifyType)> NOTIFY_FUNCTION_FACTORY = {
41 { "CameraDaemon", ffrt::CPUMonitor::HandleTaskNotifyConservative },
42 { "bluetooth", ffrt::CPUMonitor::HandleTaskNotifyUltraConservative },
43 };
44 }
45
46 namespace ffrt {
47 constexpr int MANAGER_DESTRUCT_TIMESOUT = 1000;
48 constexpr uint64_t DELAYED_WAKED_UP_TASK_TIME_INTERVAL = 5 * 1000 * 1000;
SCPUWorkerManager()49 SCPUWorkerManager::SCPUWorkerManager()
50 {
51 monitor = CPUManagerStrategy::CreateCPUMonitor(this);
52 (void)monitor->StartMonitor();
53 }
54
~SCPUWorkerManager()55 SCPUWorkerManager::~SCPUWorkerManager()
56 {
57 tearDown = true;
58 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
59 int try_cnt = MANAGER_DESTRUCT_TIMESOUT;
60 while (try_cnt-- > 0) {
61 pollersMtx[qos].unlock();
62 FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
63 sleepCtl[qos].cv.notify_all();
64 {
65 usleep(1000);
66 std::shared_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
67 if (groupCtl[qos].threads.empty()) {
68 break;
69 }
70 }
71 }
72
73 if (try_cnt <= 0) {
74 FFRT_LOGE("erase qos[%d] threads failed", qos);
75 }
76 }
77 delete monitor;
78 }
79
WorkerRetiredSimplified(WorkerThread * thread)80 void SCPUWorkerManager::WorkerRetiredSimplified(WorkerThread* thread)
81 {
82 pid_t pid = thread->Id();
83 int qos = static_cast<int>(thread->GetQos());
84
85 bool isEmptyQosThreads = false;
86 {
87 std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
88 thread->SetExited(true);
89 thread->Detach();
90 auto worker = std::move(groupCtl[qos].threads[thread]);
91 int ret = groupCtl[qos].threads.erase(thread);
92 if (ret != 1) {
93 FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
94 }
95 isEmptyQosThreads = groupCtl[qos].threads.empty();
96 WorkerLeaveTg(QoS(qos), pid);
97 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
98 if (IsBlockAwareInit()) {
99 ret = BlockawareUnregister();
100 if (ret != 0) {
101 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
102 }
103 }
104 #endif
105 worker = nullptr;
106 }
107
108 // qos has no worker, start delay worker to monitor task
109 if (isEmptyQosThreads) {
110 std::shared_mutex& exitMtx = GetExitMtx();
111 exitMtx.lock_shared();
112 if (GetExitFlag()) {
113 exitMtx.unlock_shared();
114 return;
115 }
116 FFRT_LOGI("qos has no worker, start delay worker to monitor task, qos %d", qos);
117 AddDelayedTask(qos);
118 exitMtx.unlock_shared();
119 }
120 }
121
AddDelayedTask(int qos)122 void SCPUWorkerManager::AddDelayedTask(int qos)
123 {
124 WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
125 we->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(DELAYED_WAKED_UP_TASK_TIME_INTERVAL);
126 we->cb = ([this, qos](WaitEntry* we) {
127 int taskCount = GetTaskCount(QoS(qos));
128 std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
129 bool isEmpty = groupCtl[qos].threads.empty();
130 lck.unlock();
131
132 if (!isEmpty) {
133 SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
134 FFRT_LOGW("qos[%d] has worker, no need add delayed task", qos);
135 return;
136 }
137
138 if (taskCount != 0) {
139 FFRT_LOGI("notify task, qos %d", qos);
140 FFRTFacade::GetEUInstance().NotifyTaskAdded(QoS(qos));
141 } else {
142 AddDelayedTask(qos);
143 }
144 SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
145 });
146
147 if (!DelayedWakeup(we->tp, we, we->cb)) {
148 SimpleAllocator<WaitUntilEntry>::FreeMem(we);
149 FFRT_LOGW("add delyaed task failed, qos %d", qos);
150 }
151 }
152
WorkerIdleAction(const WorkerThread * thread)153 WorkerAction SCPUWorkerManager::WorkerIdleAction(const WorkerThread* thread)
154 {
155 if (tearDown) {
156 return WorkerAction::RETIRE;
157 }
158
159 auto& ctl = sleepCtl[thread->GetQos()];
160 std::unique_lock lk(ctl.mutex);
161 monitor->IntoSleep(thread->GetQos());
162 FFRT_PERF_WORKER_IDLE(static_cast<int>(thread->GetQos()));
163 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
164 BlockawareEnterSleeping();
165 #endif
166 if (ctl.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
167 bool taskExistence = GetTaskCount(thread->GetQos()) ||
168 reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
169 reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
170 bool needPoll = !FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap() &&
171 (polling_[thread->GetQos()] == 0);
172 return tearDown || taskExistence || needPoll;
173 })) {
174 monitor->WakeupSleep(thread->GetQos());
175 FFRT_PERF_WORKER_AWAKE(static_cast<int>(thread->GetQos()));
176 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
177 BlockawareLeaveSleeping();
178 #endif
179 return WorkerAction::RETRY;
180 } else {
181 #if !defined(IDLE_WORKER_DESTRUCT)
182 monitor->IntoDeepSleep(thread->GetQos());
183 CoStackFree();
184 if (monitor->IsExceedDeepSleepThreshold()) {
185 ffrt::CoRoutineReleaseMem();
186 }
187 ctl.cv.wait(lk, [this, thread] {
188 return tearDown || GetTaskCount(thread->GetQos()) ||
189 reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
190 reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
191 });
192 monitor->WakeupDeepSleep(thread->GetQos());
193 return WorkerAction::RETRY;
194 #else
195 monitor->TimeoutCount(thread->GetQos());
196 return WorkerAction::RETIRE;
197 #endif
198 }
199 }
200
WorkerIdleActionSimplified(const WorkerThread * thread)201 WorkerAction SCPUWorkerManager::WorkerIdleActionSimplified(const WorkerThread* thread)
202 {
203 if (tearDown) {
204 return WorkerAction::RETIRE;
205 }
206
207 auto& ctl = sleepCtl[thread->GetQos()];
208 std::unique_lock lk(ctl.mutex);
209 monitor->IntoSleep(thread->GetQos());
210 FFRT_PERF_WORKER_IDLE(static_cast<int>(thread->GetQos()));
211 if (ctl.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
212 bool taskExistence = GetTaskCount(thread->GetQos());
213 return tearDown || taskExistence;
214 })) {
215 monitor->WakeupSleep(thread->GetQos());
216 FFRT_PERF_WORKER_AWAKE(static_cast<int>(thread->GetQos()));
217 return WorkerAction::RETRY;
218 } else {
219 #if !defined(IDLE_WORKER_DESTRUCT)
220 monitor->IntoDeepSleep(thread->GetQos());
221 CoStackFree();
222 if (monitor->IsExceedDeepSleepThreshold()) {
223 ffrt::CoRoutineReleaseMem();
224 }
225 ctl.cv.wait(lk, [this, thread] {return tearDown || GetTaskCount(thread->GetQos());});
226 monitor->WakeupDeepSleep(thread->GetQos());
227 return WorkerAction::RETRY;
228 #else
229 monitor->TimeoutCount(thread->GetQos());
230 return WorkerAction::RETIRE;
231 #endif
232 }
233 }
234
WorkerPrepare(WorkerThread * thread)235 void SCPUWorkerManager::WorkerPrepare(WorkerThread* thread)
236 {
237 WorkerJoinTg(thread->GetQos(), thread->Id());
238 }
239
WakeupWorkers(const QoS & qos)240 void SCPUWorkerManager::WakeupWorkers(const QoS& qos)
241 {
242 if (tearDown) {
243 FFRT_LOGE("CPU Worker Manager exit");
244 return;
245 }
246
247 auto& ctl = sleepCtl[qos()];
248 ctl.cv.notify_one();
249 FFRT_PERF_WORKER_WAKE(static_cast<int>(qos));
250 }
251 } // namespace ffrt
252