• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "mpl_scheduler.h"
17 #include "mpl_timer.h"
18 #include "mpl_logging.h"
19 namespace maple {
20 constexpr double kAlternateUnits = 1000.0;
21 // ========== MplScheduler ==========
MplScheduler(const std::string & name)22 MplScheduler::MplScheduler(const std::string &name)
23     : schedulerName(name),
24       taskIdForAdd(0),
25       taskIdToRun(0),
26       taskIdExpected(0),
27       numberTasks(0),
28       numberTasksFinish(0),
29       isSchedulerSeq(false),
30       dumpTime(false),
31       statusFinish(kThreadRun)
32 {
33 }
34 
Init()35 void MplScheduler::Init()
36 {
37     char *envStr = getenv("MP_DUMPTIME");
38     if (envStr != nullptr && atoi(envStr) == 1) {
39         dumpTime = true;
40     }
41     int ret = pthread_mutex_init(&mutexTaskIdsToRun, nullptr);
42     CHECK_FATAL(ret == 0, "pthread_mutex_init failed");
43     ret = pthread_mutex_init(&mutexTaskIdsToFinish, nullptr);
44     CHECK_FATAL(ret == 0, "pthread_mutex_init failed");
45     ret = pthread_mutex_init(&mutexGlobal, nullptr);
46     CHECK_FATAL(ret == 0, "pthread_mutex_init failed");
47     mutexTaskFinishProcess = PTHREAD_MUTEX_INITIALIZER;
48     conditionFinishProcess = PTHREAD_COND_INITIALIZER;
49 }
50 
AddTask(MplTask & task)51 void MplScheduler::AddTask(MplTask &task)
52 {
53     task.SetTaskId(taskIdForAdd);
54     tbTasks.push_back(&task);
55     ++taskIdForAdd;
56     ++numberTasks;
57 }
58 
GetTaskToRun()59 MplTask *MplScheduler::GetTaskToRun()
60 {
61     MplTask *task = nullptr;
62     int ret = pthread_mutex_lock(&mutexTaskIdsToRun);
63     CHECK_FATAL(ret == 0, "pthread_mutex_lock failed");
64     if (taskIdToRun < numberTasks) {
65         task = tbTasks[taskIdToRun++];
66     }
67     ret = pthread_mutex_unlock(&mutexTaskIdsToRun);
68     CHECK_FATAL(ret == 0, "pthread_mutex_unlock failed");
69     return task;
70 }
71 
GetTaskIdsFinishSize()72 size_t MplScheduler::GetTaskIdsFinishSize()
73 {
74     int ret = pthread_mutex_lock(&mutexTaskIdsToFinish);
75     CHECK_FATAL(ret == 0, "pthread_mutex_lock failed");
76     size_t size = tbTaskIdsToFinish.size();
77     ret = pthread_mutex_unlock(&mutexTaskIdsToFinish);
78     CHECK_FATAL(ret == 0, "pthread_mutex_unlock failed");
79     return size;
80 }
81 
GetTaskFinishFirst()82 MplTask *MplScheduler::GetTaskFinishFirst()
83 {
84     MplTask *task = nullptr;
85     int ret = pthread_mutex_lock(&mutexTaskIdsToFinish);
86     CHECK_FATAL(ret == 0, "pthread_mutex_lock failed");
87     if (!tbTaskIdsToFinish.empty()) {
88         task = tbTasks[*(tbTaskIdsToFinish.begin())];
89     }
90     ret = pthread_mutex_unlock(&mutexTaskIdsToFinish);
91     CHECK_FATAL(ret == 0, "pthread_mutex_unlock failed");
92     return task;
93 }
94 
RemoveTaskFinish(uint32 id)95 void MplScheduler::RemoveTaskFinish(uint32 id)
96 {
97     int ret = pthread_mutex_lock(&mutexTaskIdsToFinish);
98     CHECK_FATAL(ret == 0, "pthread_mutex_lock failed");
99     tbTaskIdsToFinish.erase(id);
100     ret = pthread_mutex_unlock(&mutexTaskIdsToFinish);
101     CHECK_FATAL(ret == 0, "pthread_mutex_unlock failed");
102 }
103 
TaskIdFinish(uint32 id)104 void MplScheduler::TaskIdFinish(uint32 id)
105 {
106     int ret = pthread_mutex_lock(&mutexTaskIdsToFinish);
107     CHECK_FATAL(ret == 0, "pthread_mutex_lock failed");
108     tbTaskIdsToFinish.insert(id);
109     ret = pthread_mutex_unlock(&mutexTaskIdsToFinish);
110     CHECK_FATAL(ret == 0, "pthread_mutex_unlock failed");
111 }
112 
RunTask(uint32 threadsNum,bool seq)113 int MplScheduler::RunTask(uint32 threadsNum, bool seq)
114 {
115     isSchedulerSeq = seq;
116     if (threadsNum > 0) {
117         taskIdExpected = 0;
118         std::thread threads[threadsNum];
119         std::thread threadFinish;
120         for (uint32 i = 0; i < threadsNum; ++i) {
121             threads[i] = std::thread([this, i] { this->ThreadMain(i, EncodeThreadMainEnvironment(i)); });
122         }
123         if (isSchedulerSeq) {
124             threadFinish = std::thread([this] { this->ThreadFinish(EncodeThreadFinishEnvironment()); });
125         }
126         for (uint32 i = 0; i < threadsNum; ++i) {
127             threads[i].join();
128         }
129         if (isSchedulerSeq) {
130             threadFinish.join();
131         }
132     }
133     return 0;
134 }
135 
FinishTask(const MplTask & task)136 int MplScheduler::FinishTask(const MplTask &task)
137 {
138     TaskIdFinish(task.GetTaskId());
139     return 0;
140 }
141 
Reset()142 void MplScheduler::Reset()
143 {
144     tbTasks.clear();
145     tbTaskIdsToFinish.clear();
146     taskIdForAdd = 0;
147     taskIdToRun = 0;
148     taskIdExpected = 0;
149     numberTasks = 0;
150     numberTasksFinish = 0;
151     isSchedulerSeq = false;
152 }
153 
ThreadMain(uint32 threadID,MplSchedulerParam * env)154 void MplScheduler::ThreadMain(uint32 threadID, MplSchedulerParam *env)
155 {
156     MPLTimer timerTotal;
157     MPLTimer timerRun;
158     MPLTimer timerToRun;
159     MPLTimer timerFinish;
160     double timeRun = 0.0;
161     double timeToRun = 0.0;
162     double timeFinish = 0.0;
163     if (dumpTime) {
164         timerTotal.Start();
165     }
166     DecodeThreadMainEnvironment(env);
167     CallbackThreadMainStart();
168     if (dumpTime) {
169         timerToRun.Start();
170     }
171     MplTask *task = GetTaskToRun();
172     if (dumpTime) {
173         timerToRun.Stop();
174         timeToRun += timerRun.ElapsedMicroseconds();
175     }
176     int ret = 0;
177     while (task != nullptr) {
178         if (dumpTime) {
179             timerRun.Start();
180         }
181         MplTaskParam *paramRun = CallbackGetTaskRunParam();
182         (void)task->Run(paramRun);
183         if (dumpTime) {
184             timerRun.Stop();
185             timeRun += timerRun.ElapsedMicroseconds();
186             timerFinish.Start();
187         }
188         if (isSchedulerSeq) {
189             ret = FinishTask(*task);
190             CHECK_FATAL(ret == 0, "task finish failed");
191         } else {
192             MplTaskParam *paramFinish = CallbackGetTaskFinishParam();
193             ret = task->Finish(paramFinish);
194             CHECK_FATAL(ret == 0, "task finish failed");
195         }
196         if (dumpTime) {
197             timerFinish.Stop();
198             timeFinish += timerFinish.ElapsedMicroseconds();
199             timerToRun.Start();
200         }
201         task = GetTaskToRun();
202         if (dumpTime) {
203             timerToRun.Stop();
204             timeToRun += timerToRun.ElapsedMicroseconds();
205         }
206     }
207     CallbackThreadMainEnd();
208     if (dumpTime) {
209         timerTotal.Stop();
210         GlobalLock();
211         LogInfo::MapleLogger() << "MP TimeDump(" << schedulerName << ")::Thread" << threadID << "::ThreadMain "
212                                << (timerTotal.ElapsedMicroseconds() / kAlternateUnits) << "ms" << std::endl;
213         LogInfo::MapleLogger() << "MP TimeDump(" << schedulerName << ")::Thread" << threadID
214                                << "::ThreadMain::Task::Run " << (timeRun / kAlternateUnits) << "ms" << std::endl;
215         LogInfo::MapleLogger() << "MP TimeDump(" << schedulerName << ")::Thread" << threadID
216                                << "::ThreadMain::Task::ToRun " << (timeToRun / kAlternateUnits) << "ms" << std::endl;
217         LogInfo::MapleLogger() << "MP TimeDump(" << schedulerName << ")::Thread" << threadID
218                                << "::ThreadMain::Task::Finish " << (timeFinish / kAlternateUnits) << "ms" << std::endl;
219         GlobalUnlock();
220     }
221 }
222 
ThreadFinish(MplSchedulerParam * env)223 void MplScheduler::ThreadFinish(MplSchedulerParam *env)
224 {
225     statusFinish = kThreadRun;
226     DecodeThreadFinishEnvironment(env);
227     CallbackThreadFinishStart();
228     MplTask *task = nullptr;
229     int ret = 0;
230     uint32 taskId;
231     while (numberTasksFinish < numberTasks) {
232         while (true) {
233             if (GetTaskIdsFinishSize() == 0) {
234                 break;
235             }
236             task = GetTaskFinishFirst();
237             CHECK_FATAL(task != nullptr, "null ptr check");
238             taskId = task->GetTaskId();
239             if (isSchedulerSeq) {
240                 if (taskId != taskIdExpected) {
241                     break;
242                 }
243             }
244             MplTaskParam *paramFinish = CallbackGetTaskFinishParam();
245             ret = task->Finish(paramFinish);
246             CHECK_FATAL(ret == 0, "task finish failed");
247             ++numberTasksFinish;
248             if (isSchedulerSeq) {
249                 ++taskIdExpected;
250             }
251             RemoveTaskFinish(task->GetTaskId());
252         }
253     }
254     CallbackThreadFinishEnd();
255 }
256 }  // namespace maple