1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <climits>
17 #include <cstring>
18 #include <sys/stat.h>
19 #include "dfx/perf/ffrt_perf.h"
20 #include "eu/co_routine_factory.h"
21 #include "eu/cpu_manager_strategy.h"
22 #include "eu/qos_interface.h"
23 #include "eu/scpu_monitor.h"
24 #include "sched/scheduler.h"
25 #include "sched/workgroup_internal.h"
26 #include "util/ffrt_facade.h"
27 #include "util/slab.h"
28 #include "eu/scpuworker_manager.h"
29 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
30 #include "eu/blockaware.h"
31 #endif
32
33 namespace {
34 /* SUPPORT_WORKER_DESTRUCT indicates that the idle thread destruction function is supported.
35 * The stack canary is saved or restored during coroutine switch-out and switch-in,
36 * currently, only the stack canary used by the ohos compiler stack protection is global
37 * and is not affected by worker destruction.
38 */
39 #if !defined(SUPPORT_WORKER_DESTRUCT)
40 constexpr int waiting_seconds = 10;
41 #else
42 constexpr int waiting_seconds = 5;
43 #endif
44 }
45
46 namespace ffrt {
47 constexpr int MANAGER_DESTRUCT_TIMESOUT = 1000;
48 constexpr uint64_t DELAYED_WAKED_UP_TASK_TIME_INTERVAL = 5 * 1000 * 1000;
SCPUWorkerManager()49 SCPUWorkerManager::SCPUWorkerManager()
50 {
51 monitor = CPUManagerStrategy::CreateCPUMonitor(this);
52 (void)monitor->StartMonitor();
53 }
54
~SCPUWorkerManager()55 SCPUWorkerManager::~SCPUWorkerManager()
56 {
57 tearDown = true;
58 for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
59 int try_cnt = MANAGER_DESTRUCT_TIMESOUT;
60 while (try_cnt-- > 0) {
61 pollersMtx[qos].unlock();
62 FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
63 {
64 auto& ctl = sleepCtl[qos];
65 std::lock_guard lk(ctl.mutex);
66 sleepCtl[qos].cv.notify_all();
67 }
68 {
69 usleep(1000);
70 std::shared_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
71 if (groupCtl[qos].threads.empty()) {
72 break;
73 }
74 }
75 }
76
77 if (try_cnt <= 0) {
78 FFRT_LOGE("erase qos[%d] threads failed", qos);
79 }
80 }
81 delete monitor;
82 }
83
WorkerRetiredSimplified(WorkerThread * thread)84 void SCPUWorkerManager::WorkerRetiredSimplified(WorkerThread* thread)
85 {
86 pid_t pid = thread->Id();
87 int qos = static_cast<int>(thread->GetQos());
88
89 bool isEmptyQosThreads = false;
90 {
91 std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
92 thread->SetExited(true);
93 thread->Detach();
94 auto worker = std::move(groupCtl[qos].threads[thread]);
95 int ret = groupCtl[qos].threads.erase(thread);
96 if (ret != 1) {
97 FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
98 }
99 isEmptyQosThreads = groupCtl[qos].threads.empty();
100 WorkerLeaveTg(QoS(qos), pid);
101 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
102 if (IsBlockAwareInit()) {
103 ret = BlockawareUnregister();
104 if (ret != 0) {
105 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
106 }
107 }
108 #endif
109 worker = nullptr;
110 }
111
112 // qos has no worker, start delay worker to monitor task
113 if (isEmptyQosThreads) {
114 std::shared_mutex& exitMtx = GetExitMtx();
115 exitMtx.lock_shared();
116 if (GetExitFlag()) {
117 exitMtx.unlock_shared();
118 return;
119 }
120 FFRT_LOGI("qos has no worker, start delay worker to monitor task, qos %d", qos);
121 AddDelayedTask(qos);
122 exitMtx.unlock_shared();
123 }
124 }
125
PickUpTaskBatch(WorkerThread * thread)126 TaskBase* SCPUWorkerManager::PickUpTaskBatch(WorkerThread* thread)
127 {
128 if (tearDown) {
129 return nullptr;
130 }
131
132 auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
133 auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
134 std::lock_guard lg(*lock);
135 TaskBase* task = sched.PickNextTask();
136
137 #ifdef FFRT_LOCAL_QUEUE_ENABLE
138 if (task == nullptr) {
139 return nullptr;
140 }
141
142 int wakedWorkerNum = monitor->WakedWorkerNum(thread->GetQos());
143 // when there is only one worker, the global queue is equivalent to the local queue
144 // prevents local queue tasks that cannot be executed due to blocking tasks
145 if (wakedWorkerNum <= 1) {
146 return task;
147 }
148
149 SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(thread)->localFifo);
150 int expectedTask = GetTaskCount(thread->GetQos()) / wakedWorkerNum - 1;
151 for (int i = 0; i < expectedTask; i++) {
152 if (queue->GetLength() == queue->GetCapacity()) {
153 return task;
154 }
155
156 TaskBase* task2local = sched.PickNextTask();
157 if (task2local == nullptr) {
158 return task;
159 }
160
161 queue->PushTail(task2local);
162 }
163 #endif
164
165 return task;
166 }
167
AddDelayedTask(int qos)168 void SCPUWorkerManager::AddDelayedTask(int qos)
169 {
170 WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
171 we->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(DELAYED_WAKED_UP_TASK_TIME_INTERVAL);
172 we->cb = ([this, qos](WaitEntry* we) {
173 int taskCount = GetTaskCount(QoS(qos));
174 std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
175 bool isEmpty = groupCtl[qos].threads.empty();
176 lck.unlock();
177
178 if (!isEmpty) {
179 SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
180 FFRT_LOGW("qos[%d] has worker, no need add delayed task", qos);
181 return;
182 }
183
184 if (taskCount != 0) {
185 FFRT_LOGI("notify task, qos %d", qos);
186 FFRTFacade::GetEUInstance().NotifyTaskAdded(QoS(qos));
187 } else {
188 AddDelayedTask(qos);
189 }
190 SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
191 });
192
193 if (!DelayedWakeup(we->tp, we, we->cb)) {
194 SimpleAllocator<WaitUntilEntry>::FreeMem(we);
195 FFRT_LOGW("add delyaed task failed, qos %d", qos);
196 }
197 }
198
WorkerIdleAction(const WorkerThread * thread)199 WorkerAction SCPUWorkerManager::WorkerIdleAction(const WorkerThread* thread)
200 {
201 if (tearDown) {
202 return WorkerAction::RETIRE;
203 }
204
205 auto& ctl = sleepCtl[thread->GetQos()];
206 std::unique_lock lk(ctl.mutex);
207 monitor->IntoSleep(thread->GetQos());
208 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
209 BlockawareEnterSleeping();
210 #endif
211 if (ctl.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
212 bool taskExistence = GetTaskCount(thread->GetQos());
213 bool needPoll = !FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap() &&
214 (polling_[thread->GetQos()] == 0);
215 return tearDown || taskExistence || needPoll;
216 })) {
217 monitor->WakeupSleep(thread->GetQos());
218 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
219 BlockawareLeaveSleeping();
220 #endif
221 return WorkerAction::RETRY;
222 } else {
223 #if !defined(SUPPORT_WORKER_DESTRUCT)
224 monitor->IntoDeepSleep(thread->GetQos());
225 CoStackFree();
226 if (monitor->IsExceedDeepSleepThreshold()) {
227 ffrt::CoRoutineReleaseMem();
228 }
229 ctl.cv.wait(lk, [this, thread] {
230 return tearDown || GetTaskCount(thread->GetQos()) ||
231 reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
232 reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
233 });
234 monitor->WakeupDeepSleep(thread->GetQos());
235 return WorkerAction::RETRY;
236 #else
237 monitor->TimeoutCount(thread->GetQos());
238 return WorkerAction::RETIRE;
239 #endif
240 }
241 }
242
WorkerPrepare(WorkerThread * thread)243 void SCPUWorkerManager::WorkerPrepare(WorkerThread* thread)
244 {
245 WorkerJoinTg(thread->GetQos(), thread->Id());
246 }
247
WakeupWorkers(const QoS & qos)248 void SCPUWorkerManager::WakeupWorkers(const QoS& qos)
249 {
250 if (tearDown) {
251 FFRT_LOGE("CPU Worker Manager exit");
252 return;
253 }
254
255 auto& ctl = sleepCtl[qos()];
256 ctl.cv.notify_one();
257 }
258 } // namespace ffrt
259