1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "mpl_scheduler.h"
17 #include "mpl_timer.h"
18 #include "mpl_logging.h"
19 namespace maple {
20 constexpr double kAlternateUnits = 1000.0;
21 // ========== MplScheduler ==========
MplScheduler(const std::string & name)22 MplScheduler::MplScheduler(const std::string &name)
23 : schedulerName(name),
24 taskIdForAdd(0),
25 taskIdToRun(0),
26 taskIdExpected(0),
27 numberTasks(0),
28 numberTasksFinish(0),
29 isSchedulerSeq(false),
30 dumpTime(false),
31 statusFinish(kThreadRun)
32 {
33 }
34
Init()35 void MplScheduler::Init()
36 {
37 char *envStr = getenv("MP_DUMPTIME");
38 if (envStr != nullptr && atoi(envStr) == 1) {
39 dumpTime = true;
40 }
41 int ret = pthread_mutex_init(&mutexTaskIdsToRun, nullptr);
42 CHECK_FATAL(ret == 0, "pthread_mutex_init failed");
43 ret = pthread_mutex_init(&mutexTaskIdsToFinish, nullptr);
44 CHECK_FATAL(ret == 0, "pthread_mutex_init failed");
45 ret = pthread_mutex_init(&mutexGlobal, nullptr);
46 CHECK_FATAL(ret == 0, "pthread_mutex_init failed");
47 mutexTaskFinishProcess = PTHREAD_MUTEX_INITIALIZER;
48 conditionFinishProcess = PTHREAD_COND_INITIALIZER;
49 }
50
AddTask(MplTask & task)51 void MplScheduler::AddTask(MplTask &task)
52 {
53 task.SetTaskId(taskIdForAdd);
54 tbTasks.push_back(&task);
55 ++taskIdForAdd;
56 ++numberTasks;
57 }
58
GetTaskToRun()59 MplTask *MplScheduler::GetTaskToRun()
60 {
61 MplTask *task = nullptr;
62 int ret = pthread_mutex_lock(&mutexTaskIdsToRun);
63 CHECK_FATAL(ret == 0, "pthread_mutex_lock failed");
64 if (taskIdToRun < numberTasks) {
65 task = tbTasks[taskIdToRun++];
66 }
67 ret = pthread_mutex_unlock(&mutexTaskIdsToRun);
68 CHECK_FATAL(ret == 0, "pthread_mutex_unlock failed");
69 return task;
70 }
71
GetTaskIdsFinishSize()72 size_t MplScheduler::GetTaskIdsFinishSize()
73 {
74 int ret = pthread_mutex_lock(&mutexTaskIdsToFinish);
75 CHECK_FATAL(ret == 0, "pthread_mutex_lock failed");
76 size_t size = tbTaskIdsToFinish.size();
77 ret = pthread_mutex_unlock(&mutexTaskIdsToFinish);
78 CHECK_FATAL(ret == 0, "pthread_mutex_unlock failed");
79 return size;
80 }
81
GetTaskFinishFirst()82 MplTask *MplScheduler::GetTaskFinishFirst()
83 {
84 MplTask *task = nullptr;
85 int ret = pthread_mutex_lock(&mutexTaskIdsToFinish);
86 CHECK_FATAL(ret == 0, "pthread_mutex_lock failed");
87 if (!tbTaskIdsToFinish.empty()) {
88 task = tbTasks[*(tbTaskIdsToFinish.begin())];
89 }
90 ret = pthread_mutex_unlock(&mutexTaskIdsToFinish);
91 CHECK_FATAL(ret == 0, "pthread_mutex_unlock failed");
92 return task;
93 }
94
RemoveTaskFinish(uint32 id)95 void MplScheduler::RemoveTaskFinish(uint32 id)
96 {
97 int ret = pthread_mutex_lock(&mutexTaskIdsToFinish);
98 CHECK_FATAL(ret == 0, "pthread_mutex_lock failed");
99 tbTaskIdsToFinish.erase(id);
100 ret = pthread_mutex_unlock(&mutexTaskIdsToFinish);
101 CHECK_FATAL(ret == 0, "pthread_mutex_unlock failed");
102 }
103
TaskIdFinish(uint32 id)104 void MplScheduler::TaskIdFinish(uint32 id)
105 {
106 int ret = pthread_mutex_lock(&mutexTaskIdsToFinish);
107 CHECK_FATAL(ret == 0, "pthread_mutex_lock failed");
108 tbTaskIdsToFinish.insert(id);
109 ret = pthread_mutex_unlock(&mutexTaskIdsToFinish);
110 CHECK_FATAL(ret == 0, "pthread_mutex_unlock failed");
111 }
112
RunTask(uint32 threadsNum,bool seq)113 int MplScheduler::RunTask(uint32 threadsNum, bool seq)
114 {
115 isSchedulerSeq = seq;
116 if (threadsNum > 0) {
117 taskIdExpected = 0;
118 std::thread threads[threadsNum];
119 std::thread threadFinish;
120 for (uint32 i = 0; i < threadsNum; ++i) {
121 threads[i] = std::thread([this, i] { this->ThreadMain(i, EncodeThreadMainEnvironment(i)); });
122 }
123 if (isSchedulerSeq) {
124 threadFinish = std::thread([this] { this->ThreadFinish(EncodeThreadFinishEnvironment()); });
125 }
126 for (uint32 i = 0; i < threadsNum; ++i) {
127 threads[i].join();
128 }
129 if (isSchedulerSeq) {
130 threadFinish.join();
131 }
132 }
133 return 0;
134 }
135
FinishTask(const MplTask & task)136 int MplScheduler::FinishTask(const MplTask &task)
137 {
138 TaskIdFinish(task.GetTaskId());
139 return 0;
140 }
141
Reset()142 void MplScheduler::Reset()
143 {
144 tbTasks.clear();
145 tbTaskIdsToFinish.clear();
146 taskIdForAdd = 0;
147 taskIdToRun = 0;
148 taskIdExpected = 0;
149 numberTasks = 0;
150 numberTasksFinish = 0;
151 isSchedulerSeq = false;
152 }
153
ThreadMain(uint32 threadID,MplSchedulerParam * env)154 void MplScheduler::ThreadMain(uint32 threadID, MplSchedulerParam *env)
155 {
156 MPLTimer timerTotal;
157 MPLTimer timerRun;
158 MPLTimer timerToRun;
159 MPLTimer timerFinish;
160 double timeRun = 0.0;
161 double timeToRun = 0.0;
162 double timeFinish = 0.0;
163 if (dumpTime) {
164 timerTotal.Start();
165 }
166 DecodeThreadMainEnvironment(env);
167 CallbackThreadMainStart();
168 if (dumpTime) {
169 timerToRun.Start();
170 }
171 MplTask *task = GetTaskToRun();
172 if (dumpTime) {
173 timerToRun.Stop();
174 timeToRun += timerRun.ElapsedMicroseconds();
175 }
176 int ret = 0;
177 while (task != nullptr) {
178 if (dumpTime) {
179 timerRun.Start();
180 }
181 MplTaskParam *paramRun = CallbackGetTaskRunParam();
182 (void)task->Run(paramRun);
183 if (dumpTime) {
184 timerRun.Stop();
185 timeRun += timerRun.ElapsedMicroseconds();
186 timerFinish.Start();
187 }
188 if (isSchedulerSeq) {
189 ret = FinishTask(*task);
190 CHECK_FATAL(ret == 0, "task finish failed");
191 } else {
192 MplTaskParam *paramFinish = CallbackGetTaskFinishParam();
193 ret = task->Finish(paramFinish);
194 CHECK_FATAL(ret == 0, "task finish failed");
195 }
196 if (dumpTime) {
197 timerFinish.Stop();
198 timeFinish += timerFinish.ElapsedMicroseconds();
199 timerToRun.Start();
200 }
201 task = GetTaskToRun();
202 if (dumpTime) {
203 timerToRun.Stop();
204 timeToRun += timerToRun.ElapsedMicroseconds();
205 }
206 }
207 CallbackThreadMainEnd();
208 if (dumpTime) {
209 timerTotal.Stop();
210 GlobalLock();
211 LogInfo::MapleLogger() << "MP TimeDump(" << schedulerName << ")::Thread" << threadID << "::ThreadMain "
212 << (timerTotal.ElapsedMicroseconds() / kAlternateUnits) << "ms" << std::endl;
213 LogInfo::MapleLogger() << "MP TimeDump(" << schedulerName << ")::Thread" << threadID
214 << "::ThreadMain::Task::Run " << (timeRun / kAlternateUnits) << "ms" << std::endl;
215 LogInfo::MapleLogger() << "MP TimeDump(" << schedulerName << ")::Thread" << threadID
216 << "::ThreadMain::Task::ToRun " << (timeToRun / kAlternateUnits) << "ms" << std::endl;
217 LogInfo::MapleLogger() << "MP TimeDump(" << schedulerName << ")::Thread" << threadID
218 << "::ThreadMain::Task::Finish " << (timeFinish / kAlternateUnits) << "ms" << std::endl;
219 GlobalUnlock();
220 }
221 }
222
ThreadFinish(MplSchedulerParam * env)223 void MplScheduler::ThreadFinish(MplSchedulerParam *env)
224 {
225 statusFinish = kThreadRun;
226 DecodeThreadFinishEnvironment(env);
227 CallbackThreadFinishStart();
228 MplTask *task = nullptr;
229 int ret = 0;
230 uint32 taskId;
231 while (numberTasksFinish < numberTasks) {
232 while (true) {
233 if (GetTaskIdsFinishSize() == 0) {
234 break;
235 }
236 task = GetTaskFinishFirst();
237 CHECK_FATAL(task != nullptr, "null ptr check");
238 taskId = task->GetTaskId();
239 if (isSchedulerSeq) {
240 if (taskId != taskIdExpected) {
241 break;
242 }
243 }
244 MplTaskParam *paramFinish = CallbackGetTaskFinishParam();
245 ret = task->Finish(paramFinish);
246 CHECK_FATAL(ret == 0, "task finish failed");
247 ++numberTasksFinish;
248 if (isSchedulerSeq) {
249 ++taskIdExpected;
250 }
251 RemoveTaskFinish(task->GetTaskId());
252 }
253 }
254 CallbackThreadFinishEnd();
255 }
256 } // namespace maple