• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "eu/scpu_monitor.h"
17 #include "dfx/trace_record/ffrt_trace_record.h"
18 #include "eu/cpu_manager_strategy.h"
19 #include "util/ffrt_facade.h"
20 
21 namespace {
22 const size_t TIGGER_SUPPRESS_WORKER_COUNT = 4;
23 const size_t TIGGER_SUPPRESS_EXECUTION_NUM = 2;
24 const size_t MAX_ESCAPE_WORKER_NUM = 1024;
25 }
26 
27 namespace ffrt {
IntoSleep(const QoS & qos)28 void SCPUMonitor::IntoSleep(const QoS& qos)
29 {
30     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
31     workerCtrl.lock.lock();
32     workerCtrl.sleepingWorkerNum++;
33     workerCtrl.executionNum--;
34     workerCtrl.lock.unlock();
35 }
36 
IntoPollWait(const QoS & qos)37 void SCPUMonitor::IntoPollWait(const QoS& qos)
38 {
39     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
40     std::lock_guard lk(workerCtrl.lock);
41     workerCtrl.pollWaitFlag = true;
42 }
43 
Notify(const QoS & qos,TaskNotifyType notifyType)44 void SCPUMonitor::Notify(const QoS& qos, TaskNotifyType notifyType)
45 {
46     GetOps().HandleTaskNotity(qos, this, notifyType);
47 }
48 
WorkerInit()49 void SCPUMonitor::WorkerInit()
50 {
51     return;
52 }
53 
54 // default strategy which is kind of radical for poking workers
HandleTaskNotifyDefault(const QoS & qos,void * monitorPtr,TaskNotifyType notifyType)55 void SCPUMonitor::HandleTaskNotifyDefault(const QoS& qos, void* monitorPtr, TaskNotifyType notifyType)
56 {
57     SCPUMonitor* monitor = reinterpret_cast<SCPUMonitor*>(monitorPtr);
58     size_t taskCount = static_cast<size_t>(monitor->GetOps().GetTaskCount(qos));
59     switch (notifyType) {
60         case TaskNotifyType::TASK_ADDED:
61         case TaskNotifyType::TASK_PICKED:
62         case TaskNotifyType::TASK_ESCAPED:
63             if (taskCount > 0) {
64                 monitor->Poke(qos, taskCount, notifyType);
65             }
66             break;
67         case TaskNotifyType::TASK_LOCAL:
68                 monitor->Poke(qos, taskCount, notifyType);
69             break;
70         default:
71             break;
72     }
73 }
74 
75 // conservative strategy for poking workers
HandleTaskNotifyConservative(const QoS & qos,void * monitorPtr,TaskNotifyType notifyType)76 void SCPUMonitor::HandleTaskNotifyConservative(const QoS& qos, void* monitorPtr, TaskNotifyType notifyType)
77 {
78     SCPUMonitor* monitor = reinterpret_cast<SCPUMonitor*>(monitorPtr);
79     int taskCount = monitor->ops.GetTaskCount(qos);
80     if (taskCount == 0) {
81         // no available task in global queue, skip
82         return;
83     }
84     constexpr double thresholdTaskPick = 1.0;
85     WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
86     workerCtrl.lock.lock();
87 
88     if (notifyType == TaskNotifyType::TASK_PICKED) {
89         int wakedWorkerCount = workerCtrl.executionNum;
90         double remainingLoadRatio = (wakedWorkerCount == 0) ? static_cast<double>(workerCtrl.maxConcurrency) :
91             static_cast<double>(taskCount) / static_cast<double>(wakedWorkerCount);
92         if (remainingLoadRatio <= thresholdTaskPick) {
93             // for task pick, wake worker when load ratio > 1
94             workerCtrl.lock.unlock();
95             return;
96         }
97     }
98 
99     if (static_cast<uint32_t>(workerCtrl.executionNum) < workerCtrl.maxConcurrency) {
100         if (workerCtrl.sleepingWorkerNum == 0) {
101             FFRT_LOGI("begin to create worker, notifyType[%d]"
102                 "execnum[%d], maxconcur[%d], slpnum[%d], dslpnum[%d]",
103                 notifyType, workerCtrl.executionNum, workerCtrl.maxConcurrency,
104                 workerCtrl.sleepingWorkerNum, workerCtrl.deepSleepingWorkerNum);
105             workerCtrl.executionNum++;
106             workerCtrl.lock.unlock();
107             monitor->ops.IncWorker(qos);
108         } else {
109             workerCtrl.lock.unlock();
110             monitor->ops.WakeupWorkers(qos);
111         }
112     } else {
113         if (workerCtrl.pollWaitFlag) {
114             FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
115         }
116         workerCtrl.lock.unlock();
117     }
118 }
119 
HandleTaskNotifyUltraConservative(const QoS & qos,void * monitorPtr,TaskNotifyType notifyType)120 void SCPUMonitor::HandleTaskNotifyUltraConservative(const QoS& qos, void* monitorPtr, TaskNotifyType notifyType)
121 {
122     (void)notifyType;
123     SCPUMonitor* monitor = reinterpret_cast<SCPUMonitor*>(monitorPtr);
124     int taskCount = monitor->ops.GetTaskCount(qos);
125     if (taskCount == 0) {
126         // no available task in global queue, skip
127         return;
128     }
129 
130     WorkerCtrl& workerCtrl = monitor->ctrlQueue[static_cast<int>(qos)];
131     std::lock_guard lock(workerCtrl.lock);
132 
133     int runningNum = static_cast<int>(monitor->GetRunningNum(qos));
134 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
135     if (monitor->blockAwareInit && !monitor->stopMonitor && taskCount == runningNum) {
136         return;
137     }
138 #endif
139 
140     if (taskCount < runningNum) {
141         return;
142     }
143 
144     if (runningNum < static_cast<int>(workerCtrl.maxConcurrency)) {
145         if (workerCtrl.sleepingWorkerNum == 0) {
146             workerCtrl.executionNum++;
147             monitor->ops.IncWorker(qos);
148         } else {
149             monitor->ops.WakeupWorkers(qos);
150         }
151     }
152 }
153 
Poke(const QoS & qos,uint32_t taskCount,TaskNotifyType notifyType)154 void SCPUMonitor::Poke(const QoS& qos, uint32_t taskCount, TaskNotifyType notifyType)
155 {
156     WorkerCtrl& workerCtrl = ctrlQueue[static_cast<int>(qos)];
157     workerCtrl.lock.lock();
158     size_t runningNum = GetRunningNum(qos);
159     size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
160 
161     bool tiggerSuppression = (totalNum > TIGGER_SUPPRESS_WORKER_COUNT) &&
162         (runningNum > TIGGER_SUPPRESS_EXECUTION_NUM) && (taskCount < runningNum);
163     if (notifyType != TaskNotifyType::TASK_ADDED && notifyType != TaskNotifyType::TASK_ESCAPED && tiggerSuppression) {
164         workerCtrl.lock.unlock();
165         return;
166     }
167 
168     if ((static_cast<uint32_t>(workerCtrl.sleepingWorkerNum) > 0) && (runningNum < workerCtrl.maxConcurrency)) {
169         workerCtrl.lock.unlock();
170         ops.WakeupWorkers(qos);
171     } else if ((runningNum < workerCtrl.maxConcurrency) && (totalNum < workerCtrl.hardLimit)) {
172         workerCtrl.executionNum++;
173         FFRTTraceRecord::WorkRecord(qos(), workerCtrl.executionNum);
174         workerCtrl.lock.unlock();
175         ops.IncWorker(qos);
176     } else if (escapeMgr_.IsEscapeEnable() && (runningNum == 0) && (totalNum < MAX_ESCAPE_WORKER_NUM)) {
177         escapeMgr_.SubmitEscape(qos, totalNum, this);
178         workerCtrl.lock.unlock();
179     } else {
180         if (workerCtrl.pollWaitFlag) {
181             FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
182         }
183         workerCtrl.lock.unlock();
184     }
185 }
186 
ExecuteEscape(int qos,void * monitorPtr)187 void SCPUMonitor::ExecuteEscape(int qos, void* monitorPtr)
188 {
189     SCPUMonitor* monitor = reinterpret_cast<SCPUMonitor*>(monitorPtr);
190     if (monitor->escapeMgr_.IsEscapeEnable() && monitor->ops.GetTaskCount(qos) > 0) {
191         WorkerCtrl& workerCtrl = monitor->ctrlQueue[qos];
192         workerCtrl.lock.lock();
193 
194         size_t runningNum = monitor->GetRunningNum(qos);
195         size_t totalNum = static_cast<size_t>(workerCtrl.sleepingWorkerNum + workerCtrl.executionNum);
196         if ((workerCtrl.sleepingWorkerNum > 0) && (runningNum < workerCtrl.maxConcurrency)) {
197             workerCtrl.lock.unlock();
198             monitor->ops.WakeupWorkers(qos);
199         } else if ((runningNum == 0) && (totalNum < MAX_ESCAPE_WORKER_NUM)) {
200             workerCtrl.executionNum++;
201             FFRTTraceRecord::WorkRecord(qos, workerCtrl.executionNum);
202             workerCtrl.lock.unlock();
203             monitor->ops.IncWorker(qos);
204             monitor->ReportEscapeEvent(qos, totalNum);
205         } else {
206             if (workerCtrl.pollWaitFlag) {
207                 FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
208             }
209             workerCtrl.lock.unlock();
210         }
211     }
212 }
213 }