• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <climits>
17 #include <cstring>
18 #include <sys/stat.h>
19 #include "dfx/perf/ffrt_perf.h"
20 #include "eu/co_routine_factory.h"
21 #include "eu/cpu_manager_strategy.h"
22 #include "eu/qos_interface.h"
23 #include "eu/scpu_monitor.h"
24 #include "sched/scheduler.h"
25 #include "sched/workgroup_internal.h"
26 #include "util/ffrt_facade.h"
27 #include "util/slab.h"
28 #include "eu/scpuworker_manager.h"
29 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
30 #include "eu/blockaware.h"
31 #endif
32 
33 namespace {
34 #if !defined(IDLE_WORKER_DESTRUCT)
35 constexpr int waiting_seconds = 10;
36 #else
37 constexpr int waiting_seconds = 5;
38 #endif
39 
40 const std::map<std::string, void(*)(const ffrt::QoS&, void*, ffrt::TaskNotifyType)> NOTIFY_FUNCTION_FACTORY = {
41     { "CameraDaemon", ffrt::CPUMonitor::HandleTaskNotifyConservative },
42     { "bluetooth", ffrt::CPUMonitor::HandleTaskNotifyUltraConservative },
43 };
44 }
45 
46 namespace ffrt {
47 constexpr int MANAGER_DESTRUCT_TIMESOUT = 1000;
48 constexpr uint64_t DELAYED_WAKED_UP_TASK_TIME_INTERVAL = 5 * 1000 * 1000;
SCPUWorkerManager()49 SCPUWorkerManager::SCPUWorkerManager()
50 {
51     monitor = CPUManagerStrategy::CreateCPUMonitor(this);
52     (void)monitor->StartMonitor();
53 }
54 
~SCPUWorkerManager()55 SCPUWorkerManager::~SCPUWorkerManager()
56 {
57     tearDown = true;
58     for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
59         int try_cnt = MANAGER_DESTRUCT_TIMESOUT;
60         while (try_cnt-- > 0) {
61             pollersMtx[qos].unlock();
62             FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
63             sleepCtl[qos].cv.notify_all();
64             {
65                 usleep(1000);
66                 std::shared_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
67                 if (groupCtl[qos].threads.empty()) {
68                     break;
69                 }
70             }
71         }
72 
73         if (try_cnt <= 0) {
74             FFRT_LOGE("erase qos[%d] threads failed", qos);
75         }
76     }
77     delete monitor;
78 }
79 
WorkerRetiredSimplified(WorkerThread * thread)80 void SCPUWorkerManager::WorkerRetiredSimplified(WorkerThread* thread)
81 {
82     pid_t pid = thread->Id();
83     int qos = static_cast<int>(thread->GetQos());
84 
85     bool isEmptyQosThreads = false;
86     {
87         std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
88         thread->SetExited(true);
89         thread->Detach();
90         auto worker = std::move(groupCtl[qos].threads[thread]);
91         int ret = groupCtl[qos].threads.erase(thread);
92         if (ret != 1) {
93             FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
94         }
95         isEmptyQosThreads = groupCtl[qos].threads.empty();
96         WorkerLeaveTg(QoS(qos), pid);
97 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
98         if (IsBlockAwareInit()) {
99             ret = BlockawareUnregister();
100             if (ret != 0) {
101                 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
102             }
103         }
104 #endif
105         worker = nullptr;
106     }
107 
108     // qos has no worker, start delay worker to monitor task
109     if (isEmptyQosThreads) {
110         std::shared_mutex& exitMtx = GetExitMtx();
111         exitMtx.lock_shared();
112         if (GetExitFlag()) {
113             exitMtx.unlock_shared();
114             return;
115         }
116         FFRT_LOGI("qos has no worker, start delay worker to monitor task, qos %d", qos);
117         AddDelayedTask(qos);
118         exitMtx.unlock_shared();
119     }
120 }
121 
AddDelayedTask(int qos)122 void SCPUWorkerManager::AddDelayedTask(int qos)
123 {
124     WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
125     we->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(DELAYED_WAKED_UP_TASK_TIME_INTERVAL);
126     we->cb = ([this, qos](WaitEntry* we) {
127         int taskCount = GetTaskCount(QoS(qos));
128         std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
129         bool isEmpty = groupCtl[qos].threads.empty();
130         lck.unlock();
131 
132         if (!isEmpty) {
133             SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
134             FFRT_LOGW("qos[%d] has worker, no need add delayed task", qos);
135             return;
136         }
137 
138         if (taskCount != 0) {
139             FFRT_LOGI("notify task, qos %d", qos);
140             FFRTFacade::GetEUInstance().NotifyTaskAdded(QoS(qos));
141         } else {
142             AddDelayedTask(qos);
143         }
144         SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
145     });
146 
147     if (!DelayedWakeup(we->tp, we, we->cb)) {
148         SimpleAllocator<WaitUntilEntry>::FreeMem(we);
149         FFRT_LOGW("add delyaed task failed, qos %d", qos);
150     }
151 }
152 
WorkerIdleAction(const WorkerThread * thread)153 WorkerAction SCPUWorkerManager::WorkerIdleAction(const WorkerThread* thread)
154 {
155     if (tearDown) {
156         return WorkerAction::RETIRE;
157     }
158 
159     auto& ctl = sleepCtl[thread->GetQos()];
160     std::unique_lock lk(ctl.mutex);
161     monitor->IntoSleep(thread->GetQos());
162     FFRT_PERF_WORKER_IDLE(static_cast<int>(thread->GetQos()));
163 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
164     BlockawareEnterSleeping();
165 #endif
166     if (ctl.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
167         bool taskExistence = GetTaskCount(thread->GetQos()) ||
168             reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
169             reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
170         bool needPoll = !FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap() &&
171             (polling_[thread->GetQos()] == 0);
172         return tearDown || taskExistence || needPoll;
173         })) {
174         monitor->WakeupSleep(thread->GetQos());
175         FFRT_PERF_WORKER_AWAKE(static_cast<int>(thread->GetQos()));
176 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
177         BlockawareLeaveSleeping();
178 #endif
179         return WorkerAction::RETRY;
180     } else {
181 #if !defined(IDLE_WORKER_DESTRUCT)
182         monitor->IntoDeepSleep(thread->GetQos());
183         CoStackFree();
184         if (monitor->IsExceedDeepSleepThreshold()) {
185             ffrt::CoRoutineReleaseMem();
186         }
187         ctl.cv.wait(lk, [this, thread] {
188             return tearDown || GetTaskCount(thread->GetQos()) ||
189             reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
190             reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
191             });
192         monitor->WakeupDeepSleep(thread->GetQos());
193         return WorkerAction::RETRY;
194 #else
195         monitor->TimeoutCount(thread->GetQos());
196         return WorkerAction::RETIRE;
197 #endif
198     }
199 }
200 
WorkerIdleActionSimplified(const WorkerThread * thread)201 WorkerAction SCPUWorkerManager::WorkerIdleActionSimplified(const WorkerThread* thread)
202 {
203     if (tearDown) {
204         return WorkerAction::RETIRE;
205     }
206 
207     auto& ctl = sleepCtl[thread->GetQos()];
208     std::unique_lock lk(ctl.mutex);
209     monitor->IntoSleep(thread->GetQos());
210     FFRT_PERF_WORKER_IDLE(static_cast<int>(thread->GetQos()));
211     if (ctl.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
212         bool taskExistence = GetTaskCount(thread->GetQos());
213         return tearDown || taskExistence;
214         })) {
215         monitor->WakeupSleep(thread->GetQos());
216         FFRT_PERF_WORKER_AWAKE(static_cast<int>(thread->GetQos()));
217         return WorkerAction::RETRY;
218     } else {
219 #if !defined(IDLE_WORKER_DESTRUCT)
220         monitor->IntoDeepSleep(thread->GetQos());
221         CoStackFree();
222         if (monitor->IsExceedDeepSleepThreshold()) {
223             ffrt::CoRoutineReleaseMem();
224         }
225         ctl.cv.wait(lk, [this, thread] {return tearDown || GetTaskCount(thread->GetQos());});
226         monitor->WakeupDeepSleep(thread->GetQos());
227         return WorkerAction::RETRY;
228 #else
229         monitor->TimeoutCount(thread->GetQos());
230         return WorkerAction::RETIRE;
231 #endif
232     }
233 }
234 
WorkerPrepare(WorkerThread * thread)235 void SCPUWorkerManager::WorkerPrepare(WorkerThread* thread)
236 {
237     WorkerJoinTg(thread->GetQos(), thread->Id());
238 }
239 
WakeupWorkers(const QoS & qos)240 void SCPUWorkerManager::WakeupWorkers(const QoS& qos)
241 {
242     if (tearDown) {
243         FFRT_LOGE("CPU Worker Manager exit");
244         return;
245     }
246 
247     auto& ctl = sleepCtl[qos()];
248     ctl.cv.notify_one();
249     FFRT_PERF_WORKER_WAKE(static_cast<int>(qos));
250 }
251 } // namespace ffrt
252