• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <climits>
17 #include <cstring>
18 #include <sys/stat.h>
19 #include "dfx/perf/ffrt_perf.h"
20 #include "eu/co_routine_factory.h"
21 #include "eu/cpu_manager_strategy.h"
22 #include "eu/qos_interface.h"
23 #include "eu/scpu_monitor.h"
24 #include "sched/scheduler.h"
25 #include "sched/workgroup_internal.h"
26 #include "util/ffrt_facade.h"
27 #include "util/slab.h"
28 #include "eu/scpuworker_manager.h"
29 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
30 #include "eu/blockaware.h"
31 #endif
32 
33 namespace {
34 /* SUPPORT_WORKER_DESTRUCT indicates that the idle thread destruction function is supported.
35  * The stack canary is saved or restored during coroutine switch-out and switch-in,
36  * currently, only the stack canary used by the ohos compiler stack protection is global
37  * and is not affected by worker destruction.
38  */
39 #if !defined(SUPPORT_WORKER_DESTRUCT)
40 constexpr int waiting_seconds = 10;
41 #else
42 constexpr int waiting_seconds = 5;
43 #endif
44 }
45 
46 namespace ffrt {
47 constexpr int MANAGER_DESTRUCT_TIMESOUT = 1000;
48 constexpr uint64_t DELAYED_WAKED_UP_TASK_TIME_INTERVAL = 5 * 1000 * 1000;
SCPUWorkerManager()49 SCPUWorkerManager::SCPUWorkerManager()
50 {
51     monitor = CPUManagerStrategy::CreateCPUMonitor(this);
52     (void)monitor->StartMonitor();
53 }
54 
~SCPUWorkerManager()55 SCPUWorkerManager::~SCPUWorkerManager()
56 {
57     tearDown = true;
58     for (auto qos = QoS::Min(); qos < QoS::Max(); ++qos) {
59         int try_cnt = MANAGER_DESTRUCT_TIMESOUT;
60         while (try_cnt-- > 0) {
61             pollersMtx[qos].unlock();
62             FFRTFacade::GetPPInstance().GetPoller(qos).WakeUp();
63             {
64                 auto& ctl = sleepCtl[qos];
65                 std::lock_guard lk(ctl.mutex);
66                 sleepCtl[qos].cv.notify_all();
67             }
68             {
69                 usleep(1000);
70                 std::shared_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
71                 if (groupCtl[qos].threads.empty()) {
72                     break;
73                 }
74             }
75         }
76 
77         if (try_cnt <= 0) {
78             FFRT_LOGE("erase qos[%d] threads failed", qos);
79         }
80     }
81     delete monitor;
82 }
83 
WorkerRetiredSimplified(WorkerThread * thread)84 void SCPUWorkerManager::WorkerRetiredSimplified(WorkerThread* thread)
85 {
86     pid_t pid = thread->Id();
87     int qos = static_cast<int>(thread->GetQos());
88 
89     bool isEmptyQosThreads = false;
90     {
91         std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
92         thread->SetExited(true);
93         thread->Detach();
94         auto worker = std::move(groupCtl[qos].threads[thread]);
95         int ret = groupCtl[qos].threads.erase(thread);
96         if (ret != 1) {
97             FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
98         }
99         isEmptyQosThreads = groupCtl[qos].threads.empty();
100         WorkerLeaveTg(QoS(qos), pid);
101 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
102         if (IsBlockAwareInit()) {
103             ret = BlockawareUnregister();
104             if (ret != 0) {
105                 FFRT_LOGE("blockaware unregister fail, ret[%d]", ret);
106             }
107         }
108 #endif
109         worker = nullptr;
110     }
111 
112     // qos has no worker, start delay worker to monitor task
113     if (isEmptyQosThreads) {
114         std::shared_mutex& exitMtx = GetExitMtx();
115         exitMtx.lock_shared();
116         if (GetExitFlag()) {
117             exitMtx.unlock_shared();
118             return;
119         }
120         FFRT_LOGI("qos has no worker, start delay worker to monitor task, qos %d", qos);
121         AddDelayedTask(qos);
122         exitMtx.unlock_shared();
123     }
124 }
125 
PickUpTaskBatch(WorkerThread * thread)126 TaskBase* SCPUWorkerManager::PickUpTaskBatch(WorkerThread* thread)
127 {
128     if (tearDown) {
129         return nullptr;
130     }
131 
132     auto& sched = FFRTFacade::GetSchedInstance()->GetScheduler(thread->GetQos());
133     auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
134     std::lock_guard lg(*lock);
135     TaskBase* task = sched.PickNextTask();
136 
137 #ifdef FFRT_LOCAL_QUEUE_ENABLE
138     if (task == nullptr) {
139         return nullptr;
140     }
141 
142     int wakedWorkerNum = monitor->WakedWorkerNum(thread->GetQos());
143     // when there is only one worker, the global queue is equivalent to the local queue
144     // prevents local queue tasks that cannot be executed due to blocking tasks
145     if (wakedWorkerNum <= 1) {
146         return task;
147     }
148 
149     SpmcQueue* queue = &(reinterpret_cast<CPUWorker*>(thread)->localFifo);
150     int expectedTask = GetTaskCount(thread->GetQos()) / wakedWorkerNum - 1;
151     for (int i = 0; i < expectedTask; i++) {
152         if (queue->GetLength() == queue->GetCapacity()) {
153             return task;
154         }
155 
156         TaskBase* task2local = sched.PickNextTask();
157         if (task2local == nullptr) {
158             return task;
159         }
160 
161         queue->PushTail(task2local);
162     }
163 #endif
164 
165     return task;
166 }
167 
AddDelayedTask(int qos)168 void SCPUWorkerManager::AddDelayedTask(int qos)
169 {
170     WaitUntilEntry* we = new (SimpleAllocator<WaitUntilEntry>::AllocMem()) WaitUntilEntry();
171     we->tp = std::chrono::steady_clock::now() + std::chrono::microseconds(DELAYED_WAKED_UP_TASK_TIME_INTERVAL);
172     we->cb = ([this, qos](WaitEntry* we) {
173         int taskCount = GetTaskCount(QoS(qos));
174         std::unique_lock<std::shared_mutex> lck(groupCtl[qos].tgMutex);
175         bool isEmpty = groupCtl[qos].threads.empty();
176         lck.unlock();
177 
178         if (!isEmpty) {
179             SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
180             FFRT_LOGW("qos[%d] has worker, no need add delayed task", qos);
181             return;
182         }
183 
184         if (taskCount != 0) {
185             FFRT_LOGI("notify task, qos %d", qos);
186             FFRTFacade::GetEUInstance().NotifyTaskAdded(QoS(qos));
187         } else {
188             AddDelayedTask(qos);
189         }
190         SimpleAllocator<WaitUntilEntry>::FreeMem(static_cast<WaitUntilEntry*>(we));
191     });
192 
193     if (!DelayedWakeup(we->tp, we, we->cb)) {
194         SimpleAllocator<WaitUntilEntry>::FreeMem(we);
195         FFRT_LOGW("add delyaed task failed, qos %d", qos);
196     }
197 }
198 
WorkerIdleAction(const WorkerThread * thread)199 WorkerAction SCPUWorkerManager::WorkerIdleAction(const WorkerThread* thread)
200 {
201     if (tearDown) {
202         return WorkerAction::RETIRE;
203     }
204 
205     auto& ctl = sleepCtl[thread->GetQos()];
206     std::unique_lock lk(ctl.mutex);
207     monitor->IntoSleep(thread->GetQos());
208 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
209     BlockawareEnterSleeping();
210 #endif
211     if (ctl.cv.wait_for(lk, std::chrono::seconds(waiting_seconds), [this, thread] {
212         bool taskExistence = GetTaskCount(thread->GetQos());
213         bool needPoll = !FFRTFacade::GetPPInstance().GetPoller(thread->GetQos()).DetermineEmptyMap() &&
214             (polling_[thread->GetQos()] == 0);
215         return tearDown || taskExistence || needPoll;
216         })) {
217         monitor->WakeupSleep(thread->GetQos());
218 #ifdef FFRT_WORKERS_DYNAMIC_SCALING
219         BlockawareLeaveSleeping();
220 #endif
221         return WorkerAction::RETRY;
222     } else {
223 #if !defined(SUPPORT_WORKER_DESTRUCT)
224         monitor->IntoDeepSleep(thread->GetQos());
225         CoStackFree();
226         if (monitor->IsExceedDeepSleepThreshold()) {
227             ffrt::CoRoutineReleaseMem();
228         }
229         ctl.cv.wait(lk, [this, thread] {
230             return tearDown || GetTaskCount(thread->GetQos()) ||
231             reinterpret_cast<const CPUWorker*>(thread)->priority_task ||
232             reinterpret_cast<const CPUWorker*>(thread)->localFifo.GetLength();
233             });
234         monitor->WakeupDeepSleep(thread->GetQos());
235         return WorkerAction::RETRY;
236 #else
237         monitor->TimeoutCount(thread->GetQos());
238         return WorkerAction::RETIRE;
239 #endif
240     }
241 }
242 
WorkerPrepare(WorkerThread * thread)243 void SCPUWorkerManager::WorkerPrepare(WorkerThread* thread)
244 {
245     WorkerJoinTg(thread->GetQos(), thread->Id());
246 }
247 
WakeupWorkers(const QoS & qos)248 void SCPUWorkerManager::WakeupWorkers(const QoS& qos)
249 {
250     if (tearDown) {
251         FFRT_LOGE("CPU Worker Manager exit");
252         return;
253     }
254 
255     auto& ctl = sleepCtl[qos()];
256     ctl.cv.notify_one();
257 }
258 } // namespace ffrt
259