1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define MEDIA_TASK_THREAD
16 #define HST_LOG_TAG "Task"
17 #include "osal/task/task.h"
18 #include "osal/task/taskInner.h"
19 #include "osal/task/thread.h"
20 #include "osal/task/pipeline_threadpool.h"
21 #include "osal/utils/util.h"
22 #include "cpp_ext/memory_ext.h"
23 #include "common/log.h"
24
25 #include <mutex>
26
27 namespace {
28 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "TaskInner" };
29 }
30
31 namespace OHOS {
32 namespace Media {
33 namespace {
34 constexpr int64_t INVALID_DELAY_TIME_US = 10000000; // 10s
35 }
36 static std::atomic<uint16_t> singletonTaskId = 0;
37
SleepInTask(unsigned ms)38 void TaskInner::SleepInTask(unsigned ms)
39 {
40 OSAL::SleepFor(ms);
41 }
42
GetNowUs()43 static int64_t GetNowUs()
44 {
45 auto now = std::chrono::steady_clock::now();
46 return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
47 }
48
TaskInner(const std::string & name,const std::string & groupId,TaskType type,TaskPriority priority,bool singleLoop)49 TaskInner::TaskInner(const std::string& name, const std::string& groupId, TaskType type, TaskPriority priority,
50 bool singleLoop)
51 : name_(std::move(name)), runningState_(RunningState::PAUSED), singleLoop_(singleLoop)
52 {
53 MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " groupId:" PUBLIC_LOG_S " type:%{public}d ctor",
54 name_.c_str(), groupId.c_str(), type);
55 if (type == TaskType::SINGLETON) {
56 std::string newName = name_ + std::to_string(++singletonTaskId);
57 pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(newName, type, priority);
58 } else {
59 pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(groupId, type, priority);
60 }
61 }
62
Init()63 void TaskInner::Init()
64 {
65 MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " Init", name_.c_str());
66 pipelineThread_->AddTask(shared_from_this());
67 }
68
DeInit()69 void TaskInner::DeInit()
70 {
71 MEDIA_LOG_I_T(PUBLIC_LOG_S " DeInit", name_.c_str());
72 pipelineThread_->RemoveTask(shared_from_this());
73 {
74 if (pipelineThread_->IsRunningInSelf()) {
75 runningState_ = RunningState::STOPPED;
76 topProcessUs_ = -1;
77 } else {
78 AutoLock lock1(jobMutex_);
79 AutoLock lock2(stateMutex_);
80 runningState_ = RunningState::STOPPED;
81 topProcessUs_ = -1;
82 }
83 }
84 MEDIA_LOG_I_T(PUBLIC_LOG_S " DeInit done", name_.c_str());
85 }
86
~TaskInner()87 TaskInner::~TaskInner()
88 {
89 MEDIA_LOG_D_T(PUBLIC_LOG_S " dtor", name_.c_str());
90 }
91
UpdateDelayTime(int64_t delayUs)92 void TaskInner::UpdateDelayTime(int64_t delayUs)
93 {
94 if (!singleLoop_) {
95 MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime do nothing", name_.c_str());
96 return;
97 }
98 MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime enter topProcessUs:" PUBLIC_LOG_D64
99 ", delayUs:" PUBLIC_LOG_D64, name_.c_str(), topProcessUs_, delayUs);
100 pipelineThread_->LockJobState();
101 AutoLock lock(stateMutex_);
102 if (runningState_ != RunningState::STARTED) {
103 pipelineThread_->UnLockJobState(false);
104 return;
105 }
106 topProcessUs_ = GetNowUs() + delayUs;
107 pipelineThread_->UnLockJobState(true);
108 MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime exit topProcessUs:" PUBLIC_LOG_D64,
109 name_.c_str(), topProcessUs_);
110 }
111
Start()112 void TaskInner::Start()
113 {
114 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Start", name_.c_str());
115 pipelineThread_->LockJobState();
116 AutoLock lock(stateMutex_);
117 runningState_ = RunningState::STARTED;
118 if (singleLoop_) {
119 if (!job_) {
120 MEDIA_LOG_D_T("task " PUBLIC_LOG_S " Start, job invalid", name_.c_str());
121 }
122 topProcessUs_ = GetNowUs();
123 } else {
124 UpdateTop();
125 }
126 pipelineThread_->UnLockJobState(true);
127 MEDIA_LOG_I_FALSE_D(isStateLogEnabled_.load(), "task " PUBLIC_LOG_S " Start done, topProcessUs:%{public}" PRId64,
128 name_.c_str(), topProcessUs_);
129 }
130
Stop()131 void TaskInner::Stop()
132 {
133 if (pipelineThread_->IsRunningInSelf()) {
134 MEDIA_LOG_W_T(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
135 runningState_ = RunningState::STOPPED;
136 topProcessUs_ = -1;
137 return;
138 }
139 MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " Stop", name_.c_str());
140 AutoLock lock1(jobMutex_);
141 pipelineThread_->LockJobState();
142 AutoLock lock2(stateMutex_);
143 if (runningState_.load() == RunningState::STOPPED) {
144 pipelineThread_->UnLockJobState(false);
145 return;
146 }
147 runningState_ = RunningState::STOPPED;
148 topProcessUs_ = -1;
149 pipelineThread_->UnLockJobState(true);
150 MEDIA_LOG_I_T(PUBLIC_LOG_S " Stop <<", name_.c_str());
151 }
152
StopAsync()153 void TaskInner::StopAsync()
154 {
155 if (pipelineThread_->IsRunningInSelf()) {
156 MEDIA_LOG_W_T(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
157 runningState_ = RunningState::STOPPED;
158 topProcessUs_ = -1;
159 return;
160 }
161 MEDIA_LOG_I_T(PUBLIC_LOG_S " StopAsync", name_.c_str());
162 pipelineThread_->LockJobState();
163 AutoLock lock(stateMutex_);
164 bool stateChanged = false;
165 if (runningState_.load() != RunningState::STOPPED) {
166 runningState_ = RunningState::STOPPED;
167 topProcessUs_ = -1;
168 stateChanged = true;
169 }
170 pipelineThread_->UnLockJobState(stateChanged);
171 }
172
Pause()173 void TaskInner::Pause()
174 {
175 if (pipelineThread_->IsRunningInSelf()) {
176 RunningState state = runningState_.load();
177 if (state == RunningState::STARTED) {
178 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
179 PUBLIC_LOG_S " Pause done in self task", name_.c_str());
180 runningState_ = RunningState::PAUSED;
181 topProcessUs_ = -1;
182 return;
183 } else {
184 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
185 PUBLIC_LOG_S " Pause skip in self task, curret State: " PUBLIC_LOG_D32, name_.c_str(), state);
186 return;
187 }
188 }
189 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause", name_.c_str());
190 AutoLock lock1(jobMutex_);
191 pipelineThread_->LockJobState();
192 AutoLock lock2(stateMutex_);
193 RunningState state = runningState_.load();
194 if (state != RunningState::STARTED) {
195 pipelineThread_->UnLockJobState(false);
196 return;
197 }
198 runningState_ = RunningState::PAUSED;
199 topProcessUs_ = -1;
200 pipelineThread_->UnLockJobState(true);
201 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause done.", name_.c_str());
202 }
203
PauseAsync()204 void TaskInner::PauseAsync()
205 {
206 if (pipelineThread_->IsRunningInSelf()) {
207 RunningState state = runningState_.load();
208 if (state == RunningState::STARTED) {
209 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
210 PUBLIC_LOG_S " PauseAsync done in self task", name_.c_str());
211 runningState_ = RunningState::PAUSED;
212 topProcessUs_ = -1;
213 return;
214 } else {
215 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
216 PUBLIC_LOG_S " PauseAsync skip in self task, curretState:%{public}d", name_.c_str(), state);
217 return;
218 }
219 }
220 MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " PauseAsync", name_.c_str());
221 pipelineThread_->LockJobState();
222 AutoLock lock(stateMutex_);
223 bool stateChanged = false;
224 if (runningState_.load() == RunningState::STARTED) {
225 runningState_ = RunningState::PAUSED;
226 topProcessUs_ = -1;
227 stateChanged = true;
228 }
229 pipelineThread_->UnLockJobState(stateChanged);
230 }
231
RegisterJob(const std::function<int64_t ()> & job)232 void TaskInner::RegisterJob(const std::function<int64_t()>& job)
233 {
234 MEDIA_LOG_I_T(PUBLIC_LOG_S " RegisterHandler", name_.c_str());
235 job_ = std::move(job);
236 }
237
SubmitJobOnce(const std::function<void ()> & job,int64_t delayUs,bool wait)238 void TaskInner::SubmitJobOnce(const std::function<void()>& job, int64_t delayUs, bool wait)
239 {
240 MEDIA_LOG_D_T(PUBLIC_LOG_S " SubmitJobOnce", name_.c_str());
241 int64_t time = InsertJob(job, delayUs, false);
242 if (wait) {
243 AutoLock lock(stateMutex_);
244 replyCond_.Wait(lock, [this, time] { return msgQueue_.find(time) == msgQueue_.end(); });
245 }
246 }
247
SubmitJob(const std::function<void ()> & job,int64_t delayUs,bool wait)248 void TaskInner::SubmitJob(const std::function<void()>& job, int64_t delayUs, bool wait)
249 {
250 MEDIA_LOG_D_T(PUBLIC_LOG_S " SubmitJob delayUs:%{public}" PRId64, name_.c_str(), delayUs);
251 int64_t time = InsertJob(job, delayUs, true);
252 if (wait) {
253 AutoLock lock(stateMutex_);
254 replyCond_.Wait(lock, [this, time] { return jobQueue_.find(time) == jobQueue_.end(); });
255 }
256 }
257
UpdateTop()258 void TaskInner::UpdateTop()
259 {
260 // jobQueue_ is only handled in STARTED state, msgQueue_ always got handled.
261 if (msgQueue_.empty() && ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty())) {
262 topProcessUs_ = -1;
263 return;
264 }
265 if (msgQueue_.empty()) {
266 topProcessUs_ = jobQueue_.begin()->first;
267 topIsJob_ = true;
268 } else if ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty()) {
269 topProcessUs_ = msgQueue_.begin()->first;
270 topIsJob_ = false;
271 } else {
272 int64_t msgProcessTime = msgQueue_.begin()->first;
273 int64_t jobProcessTime = jobQueue_.begin()->first;
274 int64_t nowUs = GetNowUs();
275 if (msgProcessTime <= nowUs || msgProcessTime <= jobProcessTime) {
276 topProcessUs_ = msgProcessTime;
277 topIsJob_ = false;
278 } else {
279 topProcessUs_ = jobProcessTime;
280 topIsJob_ = true;
281 }
282 }
283 }
284
NextJobUs()285 int64_t TaskInner::NextJobUs()
286 {
287 AutoLock lock(stateMutex_);
288 return topProcessUs_;
289 }
290
HandleJob()291 void TaskInner::HandleJob()
292 {
293 AutoLock lock(jobMutex_);
294 if (singleLoop_) {
295 stateMutex_.lock();
296 int64_t currentTopProcessUs = topProcessUs_;
297 if (runningState_.load() == RunningState::PAUSED || runningState_.load() == RunningState::STOPPED) {
298 topProcessUs_ = -1;
299 stateMutex_.unlock();
300 return;
301 }
302 // unlock stateMutex otherwise pauseAsync/stopAsync function will wait job finish.
303 stateMutex_.unlock();
304 int64_t nextDelay = (!job_) ? INVALID_DELAY_TIME_US : job_();
305
306 AutoLock lock(stateMutex_);
307 // if topProcessUs_ is -1, we already pause/stop in job_()
308 // if topProcessUs_ is changed, we should ignore the returned delay time.
309 if (topProcessUs_ != -1 && currentTopProcessUs == topProcessUs_) {
310 topProcessUs_ = GetNowUs() + nextDelay;
311 }
312 } else {
313 std::function<void()> nextJob;
314 stateMutex_.lock();
315 if (topIsJob_) {
316 // Without this check, job may be executed in NON STARTED state,
317 // i.e. after Pause called, one job may be executed unexpectedly.
318 FALSE_EXEC_RETURN_MSG(runningState_.load() == RunningState::STARTED, stateMutex_.unlock(),
319 "not execute job, " PUBLIC_LOG_S " in state " PUBLIC_LOG_D32,
320 name_.c_str(), static_cast<int>(runningState_.load()));
321
322 nextJob = std::move(jobQueue_.begin()->second);
323 jobQueue_.erase(jobQueue_.begin());
324 } else {
325 nextJob = std::move(msgQueue_.begin()->second);
326 msgQueue_.erase(msgQueue_.begin());
327 }
328 {
329 // unlock stateMutex otherwise pauseAsync/stopAsync function will wait job finish.
330 stateMutex_.unlock();
331 nextJob();
332 replyCond_.NotifyAll();
333 }
334 AutoLock lock(stateMutex_);
335 UpdateTop();
336 }
337 }
338
UpdateThreadPriority(const uint32_t newPriority,const std::string & strBundleName)339 void TaskInner::UpdateThreadPriority(const uint32_t newPriority, const std::string &strBundleName)
340 {
341 FALSE_RETURN(pipelineThread_ != nullptr);
342 bool tmpFlag = singleLoop_;
343 singleLoop_ = false;
344 SubmitJobOnce(
345 [this, newPriority, strBundleName] { pipelineThread_->UpdateThreadPriority(newPriority, strBundleName); },
346 0, true);
347 singleLoop_ = tmpFlag;
348 }
349
InsertJob(const std::function<void ()> & job,int64_t delayUs,bool inJobQueue)350 int64_t TaskInner::InsertJob(const std::function<void()>& job, int64_t delayUs, bool inJobQueue)
351 {
352 pipelineThread_->LockJobState();
353 AutoLock lock(stateMutex_);
354 int64_t nowUs = GetNowUs();
355 if (delayUs < 0) {
356 delayUs = 0;
357 }
358 int64_t processTime = nowUs + delayUs;
359 if (inJobQueue) {
360 while (jobQueue_.find(processTime) != jobQueue_.end()) { // To prevent dropping job unexpectedly
361 MEDIA_LOG_W_T("DUPLICATIVE jobQueue_ TIMESTAMP!!!");
362 processTime++;
363 }
364 jobQueue_[processTime] = std::move(job);
365 } else {
366 while (msgQueue_.find(processTime) != msgQueue_.end()) { // To prevent dropping job unexpectedly
367 MEDIA_LOG_W_T("DUPLICATIVE msgQueue_ TIMESTAMP!!!");
368 processTime++;
369 }
370 msgQueue_[processTime] = std::move(job);
371 }
372 int64_t lastProcessUs = topProcessUs_;
373 // update top if only new job is more emgercy or jobqueue is empty
374 if (processTime <= topProcessUs_ || topProcessUs_ == -1) {
375 UpdateTop();
376 }
377 // if top is updated we should wake pipeline thread
378 pipelineThread_->UnLockJobState(lastProcessUs != topProcessUs_);
379 return processTime;
380 }
381 } // namespace Media
382 } // namespace OHOS
383