1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "eu/scpu_monitor.h"
17 #include "dfx/trace_record/ffrt_trace_record.h"
18 #include "eu/cpu_manager_strategy.h"
19 #include "util/ffrt_facade.h"
20
21 namespace {
22 const size_t TIGGER_SUPPRESS_WORKER_COUNT = 4;
23 const size_t TIGGER_SUPPRESS_EXECUTION_NUM = 2;
24 const size_t MAX_ESCAPE_WORKER_NUM = 1024;
25 }
26
27 namespace ffrt {
IntoSleep(const QoS & qos)28 void SCPUMonitor::IntoSleep(const QoS& qos)
29 {
30 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
31 workerCtrl.lock.lock();
32 workerCtrl.sleepingWorkerNum++;
33 workerCtrl.executionNum--;
34 workerCtrl.lock.unlock();
35 }
36
IntoPollWait(const QoS & qos)37 void SCPUMonitor::IntoPollWait(const QoS& qos)
38 {
39 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
40 std::lock_guard lk(workerCtrl.lock);
41 workerCtrl.pollWaitFlag = true;
42 }
43
Notify(const QoS & qos,TaskNotifyType notifyType)44 void SCPUMonitor::Notify(const QoS& qos, TaskNotifyType notifyType)
45 {
46 GetOps().HandleTaskNotity(qos, this, notifyType);
47 }
48
WorkerInit()49 void SCPUMonitor::WorkerInit()
50 {
51 return;
52 }
53
54 // default strategy which is kind of radical for poking workers
HandleTaskNotifyDefault(const QoS & qos,void * monitorPtr,TaskNotifyType notifyType)55 void SCPUMonitor::HandleTaskNotifyDefault(const QoS& qos, void* monitorPtr, TaskNotifyType notifyType)
56 {
57 SCPUMonitor* monitor = reinterpret_cast<SCPUMonitor*>(monitorPtr);
58 size_t taskCount = static_cast<size_t>(monitor->GetOps().GetTaskCount(qos));
59 switch (notifyType) {
60 case TaskNotifyType::TASK_ADDED:
61 case TaskNotifyType::TASK_PICKED:
62 case TaskNotifyType::TASK_ESCAPED:
63 if (taskCount > 0) {
64 monitor->Poke(qos, taskCount, notifyType);
65 }
66 break;
67 case TaskNotifyType::TASK_LOCAL:
68 monitor->Poke(qos, taskCount, notifyType);
69 break;
70 default:
71 break;
72 }
73 }
74
75 // conservative strategy for poking workers
HandleTaskNotifyConservative(const QoS & qos,void * monitorPtr,TaskNotifyType notifyType)76 void SCPUMonitor::HandleTaskNotifyConservative(const QoS& qos, void* monitorPtr, TaskNotifyType notifyType)
77 {
78 SCPUMonitor* monitor = reinterpret_cast<SCPUMonitor*>(monitorPtr);
79 int taskCount = monitor->ops.GetTaskCount(qos);
80 if (taskCount == 0) {
81 // no available task in global queue, skip
82 return;
83 }
84 constexpr double thresholdTaskPick = 1.0;
85 WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
86 workerCtrl.lock.lock();
87
88 if (notifyType == TaskNotifyType::TASK_PICKED) {
89 int wakedWorkerCount = workerCtrl.executionNum;
90 double remainingLoadRatio = (wakedWorkerCount == 0) ? static_cast<double>(workerCtrl.maxConcurrency) :
91 static_cast<double>(taskCount) / static_cast<double>(wakedWorkerCount);
92 if (remainingLoadRatio <= thresholdTaskPick) {
93 // for task pick, wake worker when load ratio > 1
94 workerCtrl.lock.unlock();
95 return;
96 }
97 }
98
99 if (static_cast<uint32_t>(workerCtrl.executionNum) < workerCtrl.maxConcurrency) {
100 if (workerCtrl.sleepingWorkerNum == 0) {
101 FFRT_LOGI("begin to create worker, notifyType[%d]"
102 "execnum[%d], maxconcur[%d], slpnum[%d], dslpnum[%d]",
103 notifyType, workerCtrl.executionNum, workerCtrl.maxConcurrency,
104 workerCtrl.sleepingWorkerNum, workerCtrl.deepSleepingWorkerNum);
105 workerCtrl.executionNum++;
106 workerCtrl.lock.unlock();
107 monitor->ops.IncWorker(qos);
108 } else {
109 workerCtrl.lock.unlock();
110 monitor->ops.WakeupWorkers(qos);
111 }
112 } else {
113 if (workerCtrl.pollWaitFlag) {
114 FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
115 }
116 workerCtrl.lock.unlock();
117 }
118 }
119
HandleTaskNotifyUltraConservative(const QoS & qos,void * monitorPtr,TaskNotifyType notifyType)120 void SCPUMonitor::HandleTaskNotifyUltraConservative(const QoS& qos, void* monitorPtr, TaskNotifyType notifyType)
121 {
122 (void)notifyType;
123 SCPUMonitor* monitor = reinterpret_cast<SCPUMonitor*>(monitorPtr);
124 int taskCount = monitor->ops.GetTaskCount(qos);
125 if (taskCount == 0) {
126 // no available task in global queue, skip
127 return;
128 }
129
130 WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
131 std::lock_guard lock(workerCtrl.lock);
132
133 int runningNum = static_cast<int>(monitor->GetRunningNum(qos));
134 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
135 if (monitor->blockAwareInit && !monitor->stopMonitor && taskCount == runningNum) {
136 return;
137 }
138 #endif
139
140 if (taskCount < runningNum) {
141 return;
142 }
143
144 if (runningNum < static_cast<int>(workerCtrl.maxConcurrency)) {
145 if (workerCtrl.sleepingWorkerNum == 0) {
146 workerCtrl.executionNum++;
147 monitor->ops.IncWorker(qos);
148 } else {
149 monitor->ops.WakeupWorkers(qos);
150 }
151 }
152 }
153
Poke(const QoS & qos,uint32_t taskCount,TaskNotifyType notifyType)154 void SCPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyType)
155 {
156 WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
157 workerCtrl.lock.lock();
158 size_t runningNum = GetRunningNum(qos);
159 size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
160
161 bool tiggerSuppression = (totalNum > TIGGER_SUPPRESS_WORKER_COUNT) &&
162 (runningNum > TIGGER_SUPPRESS_EXECUTION_NUM) && (taskCount < runningNum);
163 if (notifyType != TaskNotifyType::TASK_ADDED && notifyType != TaskNotifyType::TASK_ESCAPED && tiggerSuppression) {
164 workerCtrl.lock.unlock();
165 return;
166 }
167
168 if ((static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) && (runningNum < workerCtrl.maxConcurrency)) {
169 workerCtrl.lock.unlock();
170 ops.WakeupWorkers(qos);
171 } else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
172 workerCtrl.executionNum++;
173 FFRTTraceRecord::WorkRecord(qos(), workerCtrl.executionNum);
174 workerCtrl.lock.unlock();
175 ops.IncWorker(qos);
176 } else if (escapeMgr_.IsEscapeEnable() && (runningNum == 0) && (totalNum < MAX_ESCAPE_WORKER_NUM)) {
177 escapeMgr_.SubmitEscape(qos, totalNum, this);
178 workerCtrl.lock.unlock();
179 } else {
180 if (workerCtrl.pollWaitFlag) {
181 FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
182 }
183 workerCtrl.lock.unlock();
184 }
185 }
186
ExecuteEscape(int qos,void * monitorPtr)187 void SCPUMonitor::ExecuteEscape(int qos, void* monitorPtr)
188 {
189 SCPUMonitor* monitor = reinterpret_cast<SCPUMonitor*>(monitorPtr);
190 if (monitor->escapeMgr_.IsEscapeEnable() && monitor->ops.GetTaskCount(qos) > 0) {
191 WorkerCtrl& workerCtrl = monitor->ctrlQueue[qos];
192 workerCtrl.lock.lock();
193
194 size_t runningNum = monitor->GetRunningNum(qos);
195 size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
196 if ((workerCtrl.sleepingWorkerNum > 0) && (runningNum < workerCtrl.maxConcurrency)) {
197 workerCtrl.lock.unlock();
198 monitor->ops.WakeupWorkers(qos);
199 } else if ((runningNum == 0) && (totalNum < MAX_ESCAPE_WORKER_NUM)) {
200 workerCtrl.executionNum++;
201 FFRTTraceRecord::WorkRecord(qos, workerCtrl.executionNum);
202 workerCtrl.lock.unlock();
203 monitor->ops.IncWorker(qos);
204 monitor->ReportEscapeEvent(qos, totalNum);
205 } else {
206 if (workerCtrl.pollWaitFlag) {
207 FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
208 }
209 workerCtrl.lock.unlock();
210 }
211 }
212 }
213 }