1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <climits>
17 #include <cstring>
18 #include <sys/stat.h>
19 #include "eu/cpu_monitor.h"
20 #include "eu/cpu_manager_interface.h"
21 #include "sched/scheduler.h"
22 #include "sched/workgroup_internal.h"
23 #include "eu/qos_interface.h"
24 #include "eu/cpuworker_manager.h"
25
26 namespace ffrt {
27
IncWorker(const QoS & qos)28 bool CPUWorkerManager::IncWorker(const QoS& qos)
29 {
30 std::unique_lock lock(groupCtl[qos()].tgMutex);
31 if (tearDown) {
32 return false;
33 }
34
35 auto worker = std::unique_ptr<WorkerThread>(new (std::nothrow) CPUWorker(qos, {
36 std::bind(&CPUWorkerManager::PickUpTask, this, std::placeholders::_1),
37 std::bind(&CPUWorkerManager::NotifyTaskPicked, this, std::placeholders::_1),
38 std::bind(&CPUWorkerManager::WorkerIdleAction, this, std::placeholders::_1),
39 std::bind(&CPUWorkerManager::WorkerRetired, this, std::placeholders::_1),
40 }));
41 if (worker == nullptr) {
42 FFRT_LOGE("Inc CPUWorker: create worker\n");
43 return false;
44 }
45 worker->WorkerSetup(worker.get(), qos);
46 WorkerJoinTg(qos, worker->Id());
47 groupCtl[qos()].threads[worker.get()] = std::move(worker);
48 return true;
49 }
50
WakeupWorkers(const QoS & qos)51 void CPUWorkerManager::WakeupWorkers(const QoS& qos)
52 {
53 if (tearDown) {
54 return;
55 }
56
57 auto& ctl = sleepCtl[qos()];
58 ctl.cv.notify_one();
59 }
60
GetTaskCount(const QoS & qos)61 int CPUWorkerManager::GetTaskCount(const QoS& qos)
62 {
63 auto& sched = FFRTScheduler::Instance()->GetScheduler(qos);
64 return sched.RQSize();
65 }
66
PickUpTask(WorkerThread * thread)67 TaskCtx* CPUWorkerManager::PickUpTask(WorkerThread* thread)
68 {
69 if (tearDown) {
70 return nullptr;
71 }
72
73 auto& sched = FFRTScheduler::Instance()->GetScheduler(thread->GetQos());
74 auto lock = GetSleepCtl(static_cast<int>(thread->GetQos()));
75 std::lock_guard lg(*lock);
76 return sched.PickNextTask();
77 }
78
NotifyTaskPicked(const WorkerThread * thread)79 void CPUWorkerManager::NotifyTaskPicked(const WorkerThread* thread)
80 {
81 monitor.Notify(thread->GetQos(), TaskNotifyType::TASK_PICKED);
82 }
83
WorkerRetired(WorkerThread * thread)84 void CPUWorkerManager::WorkerRetired(WorkerThread* thread)
85 {
86 pid_t pid = thread->Id();
87 int qos = static_cast<int>(thread->GetQos());
88 thread->SetExited(true);
89 thread->Detach();
90
91 {
92 std::unique_lock lock(groupCtl[qos].tgMutex);
93 auto worker = std::move(groupCtl[qos].threads[thread]);
94 size_t ret = groupCtl[qos].threads.erase(thread);
95 if (ret != 1) {
96 FFRT_LOGE("erase qos[%d] thread failed, %d elements removed", qos, ret);
97 }
98 WorkerLeaveTg(qos, pid);
99 worker = nullptr;
100 }
101 }
102
WorkerIdleAction(const WorkerThread * thread)103 WorkerAction CPUWorkerManager::WorkerIdleAction(const WorkerThread* thread)
104 {
105 if (tearDown) {
106 return WorkerAction::RETIRE;
107 }
108
109 auto& ctl = sleepCtl[thread->GetQos()];
110 std::unique_lock lk(ctl.mutex);
111 monitor.IntoSleep(thread->GetQos());
112 FFRT_LOGD("worker sleep");
113 #if defined(IDLE_WORKER_DESTRUCT)
114 if (ctl.cv.wait_for(lk, std::chrono::seconds(5),
115 [this, thread] {return tearDown || GetTaskCount(thread->GetQos());})) {
116 monitor.WakeupCount(thread->GetQos());
117 FFRT_LOGD("worker awake");
118 return WorkerAction::RETRY;
119 } else {
120 monitor.TimeoutCount(thread->GetQos());
121 FFRT_LOGD("worker exit");
122 return WorkerAction::RETIRE;
123 }
124 #else /* !IDLE_WORKER_DESTRUCT */
125 ctl.cv.wait(lk, [this, thread] {return tearDown || GetTaskCount(thread->GetQos());});
126 monitor.WakeupCount(thread->GetQos());
127 FFRT_LOGD("worker awake");
128 return WorkerAction::RETRY;
129 #endif /* IDLE_WORKER_DESTRUCT */
130 }
131
NotifyTaskAdded(const QoS & qos)132 void CPUWorkerManager::NotifyTaskAdded(const QoS& qos)
133 {
134 monitor.Notify(qos, TaskNotifyType::TASK_ADDED);
135 }
136
CPUWorkerManager()137 CPUWorkerManager::CPUWorkerManager() : monitor({
138 std::bind(&CPUWorkerManager::IncWorker, this, std::placeholders::_1),
139 std::bind(&CPUWorkerManager::WakeupWorkers, this, std::placeholders::_1),
140 std::bind(&CPUWorkerManager::GetTaskCount, this, std::placeholders::_1)})
141 {
142 groupCtl[qos_deadline_request].tg = std::unique_ptr<ThreadGroup>(new ThreadGroup());
143 }
144
WorkerJoinTg(const QoS & qos,pid_t pid)145 void CPUWorkerManager::WorkerJoinTg(const QoS& qos, pid_t pid)
146 {
147 if (qos == qos_user_interactive) {
148 (void)JoinWG(pid);
149 return;
150 }
151 auto& tgwrap = groupCtl[qos()];
152 if (!tgwrap.tg) {
153 return;
154 }
155
156 if ((tgwrap.tgRefCount) == 0) {
157 return;
158 }
159
160 tgwrap.tg->Join(pid);
161 }
162
WorkerLeaveTg(const QoS & qos,pid_t pid)163 void CPUWorkerManager::WorkerLeaveTg(const QoS& qos, pid_t pid)
164 {
165 if (qos == qos_user_interactive) {
166 return;
167 }
168 auto& tgwrap = groupCtl[qos()];
169 if (!tgwrap.tg) {
170 return;
171 }
172
173 if ((tgwrap.tgRefCount) == 0) {
174 return;
175 }
176
177 tgwrap.tg->Leave(pid);
178 }
179 } // namespace ffrt
180