• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define MEDIA_TASK_THREAD
16 #define HST_LOG_TAG "Task"
17 #include "osal/task/task.h"
18 #include "osal/task/taskInner.h"
19 #include "osal/task/thread.h"
20 #include "osal/task/pipeline_threadpool.h"
21 #include "osal/utils/util.h"
22 #include "cpp_ext/memory_ext.h"
23 #include "common/log.h"
24 
25 #include <mutex>
26 
27 namespace {
28 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "TaskInner" };
29 }
30 
31 namespace OHOS {
32 namespace Media {
33 namespace {
34     constexpr int64_t INVALID_DELAY_TIME_US = 10000000; // 10s
35 }
36 static std::atomic<uint16_t> singletonTaskId = 0;
37 
SleepInTask(unsigned ms)38 void TaskInner::SleepInTask(unsigned ms)
39 {
40     OSAL::SleepFor(ms);
41 }
42 
GetNowUs()43 static int64_t GetNowUs()
44 {
45     auto now = std::chrono::steady_clock::now();
46     return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
47 }
48 
TaskInner(const std::string & name,const std::string & groupId,TaskType type,TaskPriority priority,bool singleLoop)49 TaskInner::TaskInner(const std::string& name, const std::string& groupId, TaskType type, TaskPriority priority,
50     bool singleLoop)
51     : name_(std::move(name)), runningState_(RunningState::PAUSED), singleLoop_(singleLoop)
52 {
53     MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " groupId:" PUBLIC_LOG_S " type:%{public}d ctor",
54         name_.c_str(), groupId.c_str(), type);
55     if (type == TaskType::SINGLETON) {
56         std::string newName = name_ + std::to_string(++singletonTaskId);
57         pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(newName, type, priority);
58     } else {
59         pipelineThread_ = PipeLineThreadPool::GetInstance().FindThread(groupId, type, priority);
60     }
61 }
62 
Init()63 void TaskInner::Init()
64 {
65     MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " Init", name_.c_str());
66     pipelineThread_->AddTask(shared_from_this());
67 }
68 
DeInit()69 void TaskInner::DeInit()
70 {
71     MEDIA_LOG_I_T(PUBLIC_LOG_S " DeInit", name_.c_str());
72     pipelineThread_->RemoveTask(shared_from_this());
73     {
74         if (pipelineThread_->IsRunningInSelf()) {
75             runningState_ = RunningState::STOPPED;
76             topProcessUs_ = -1;
77         } else {
78             AutoLock lock1(jobMutex_);
79             AutoLock lock2(stateMutex_);
80             runningState_ = RunningState::STOPPED;
81             topProcessUs_ = -1;
82         }
83     }
84     MEDIA_LOG_I_T(PUBLIC_LOG_S " DeInit done", name_.c_str());
85 }
86 
~TaskInner()87 TaskInner::~TaskInner()
88 {
89     MEDIA_LOG_D_T(PUBLIC_LOG_S " dtor", name_.c_str());
90 }
91 
UpdateDelayTime(int64_t delayUs)92 void TaskInner::UpdateDelayTime(int64_t delayUs)
93 {
94     if (!singleLoop_) {
95         MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime do nothing", name_.c_str());
96         return;
97     }
98     MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime enter topProcessUs:" PUBLIC_LOG_D64
99         ", delayUs:" PUBLIC_LOG_D64, name_.c_str(), topProcessUs_, delayUs);
100     pipelineThread_->LockJobState();
101     AutoLock lock(stateMutex_);
102     if (runningState_ != RunningState::STARTED) {
103         pipelineThread_->UnLockJobState(false);
104         return;
105     }
106     topProcessUs_ = GetNowUs() + delayUs;
107     pipelineThread_->UnLockJobState(true);
108     MEDIA_LOG_D_T("task " PUBLIC_LOG_S " UpdateDelayTime exit topProcessUs:" PUBLIC_LOG_D64,
109         name_.c_str(), topProcessUs_);
110 }
111 
Start()112 void TaskInner::Start()
113 {
114     MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Start", name_.c_str());
115     pipelineThread_->LockJobState();
116     AutoLock lock(stateMutex_);
117     runningState_ = RunningState::STARTED;
118     if (singleLoop_) {
119         if (!job_) {
120             MEDIA_LOG_D_T("task " PUBLIC_LOG_S " Start, job invalid", name_.c_str());
121         }
122         topProcessUs_ = GetNowUs();
123     } else {
124         UpdateTop();
125     }
126     pipelineThread_->UnLockJobState(true);
127     MEDIA_LOG_I_FALSE_D(isStateLogEnabled_.load(), "task " PUBLIC_LOG_S " Start done, topProcessUs:%{public}" PRId64,
128         name_.c_str(), topProcessUs_);
129 }
130 
Stop()131 void TaskInner::Stop()
132 {
133     if (pipelineThread_->IsRunningInSelf()) {
134         MEDIA_LOG_W_T(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
135         runningState_ = RunningState::STOPPED;
136         topProcessUs_ = -1;
137         return;
138     }
139     MEDIA_LOG_I_T(">> " PUBLIC_LOG_S " Stop", name_.c_str());
140     AutoLock lock1(jobMutex_);
141     pipelineThread_->LockJobState();
142     AutoLock lock2(stateMutex_);
143     if (runningState_.load() == RunningState::STOPPED) {
144         pipelineThread_->UnLockJobState(false);
145         return;
146     }
147     runningState_ = RunningState::STOPPED;
148     topProcessUs_ = -1;
149     pipelineThread_->UnLockJobState(true);
150     MEDIA_LOG_I_T(PUBLIC_LOG_S " Stop <<", name_.c_str());
151 }
152 
StopAsync()153 void TaskInner::StopAsync()
154 {
155     if (pipelineThread_->IsRunningInSelf()) {
156         MEDIA_LOG_W_T(PUBLIC_LOG_S " Stop done in self task", name_.c_str());
157         runningState_ = RunningState::STOPPED;
158         topProcessUs_ = -1;
159         return;
160     }
161     MEDIA_LOG_I_T(PUBLIC_LOG_S " StopAsync", name_.c_str());
162     pipelineThread_->LockJobState();
163     AutoLock lock(stateMutex_);
164     bool stateChanged = false;
165     if (runningState_.load() != RunningState::STOPPED) {
166         runningState_ = RunningState::STOPPED;
167         topProcessUs_ = -1;
168         stateChanged = true;
169     }
170     pipelineThread_->UnLockJobState(stateChanged);
171 }
172 
Pause()173 void TaskInner::Pause()
174 {
175     if (pipelineThread_->IsRunningInSelf()) {
176         RunningState state = runningState_.load();
177         if (state == RunningState::STARTED) {
178             MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
179                 PUBLIC_LOG_S " Pause done in self task", name_.c_str());
180             runningState_ = RunningState::PAUSED;
181             topProcessUs_ = -1;
182             return;
183         } else {
184             MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
185                 PUBLIC_LOG_S " Pause skip in self task, curret State: " PUBLIC_LOG_D32, name_.c_str(), state);
186             return;
187         }
188     }
189     MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause", name_.c_str());
190     AutoLock lock1(jobMutex_);
191     pipelineThread_->LockJobState();
192     AutoLock lock2(stateMutex_);
193     RunningState state = runningState_.load();
194     if (state != RunningState::STARTED) {
195         pipelineThread_->UnLockJobState(false);
196         return;
197     }
198     runningState_ = RunningState::PAUSED;
199     topProcessUs_ = -1;
200     pipelineThread_->UnLockJobState(true);
201     MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " Pause done.", name_.c_str());
202 }
203 
PauseAsync()204 void TaskInner::PauseAsync()
205 {
206     if (pipelineThread_->IsRunningInSelf()) {
207         RunningState state = runningState_.load();
208         if (state == RunningState::STARTED) {
209             MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
210                 PUBLIC_LOG_S " PauseAsync done in self task", name_.c_str());
211             runningState_ = RunningState::PAUSED;
212             topProcessUs_ = -1;
213             return;
214         } else {
215             MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(),
216                 PUBLIC_LOG_S " PauseAsync skip in self task, curretState:%{public}d", name_.c_str(), state);
217             return;
218         }
219     }
220     MEDIA_LOG_I_FALSE_D_T(isStateLogEnabled_.load(), PUBLIC_LOG_S " PauseAsync", name_.c_str());
221     pipelineThread_->LockJobState();
222     AutoLock lock(stateMutex_);
223     bool stateChanged = false;
224     if (runningState_.load() == RunningState::STARTED) {
225         runningState_ = RunningState::PAUSED;
226         topProcessUs_ = -1;
227         stateChanged = true;
228     }
229     pipelineThread_->UnLockJobState(stateChanged);
230 }
231 
RegisterJob(const std::function<int64_t ()> & job)232 void TaskInner::RegisterJob(const std::function<int64_t()>& job)
233 {
234     MEDIA_LOG_I_T(PUBLIC_LOG_S " RegisterHandler", name_.c_str());
235     job_ = std::move(job);
236 }
237 
SubmitJobOnce(const std::function<void ()> & job,int64_t delayUs,bool wait)238 void TaskInner::SubmitJobOnce(const std::function<void()>& job, int64_t delayUs, bool wait)
239 {
240     MEDIA_LOG_D_T(PUBLIC_LOG_S " SubmitJobOnce", name_.c_str());
241     int64_t time = InsertJob(job, delayUs, false);
242     if (wait) {
243         AutoLock lock(stateMutex_);
244         replyCond_.Wait(lock, [this, time] { return msgQueue_.find(time) == msgQueue_.end(); });
245     }
246 }
247 
SubmitJob(const std::function<void ()> & job,int64_t delayUs,bool wait)248 void TaskInner::SubmitJob(const std::function<void()>& job, int64_t delayUs, bool wait)
249 {
250     MEDIA_LOG_D_T(PUBLIC_LOG_S " SubmitJob delayUs:%{public}" PRId64, name_.c_str(), delayUs);
251     int64_t time = InsertJob(job, delayUs, true);
252     if (wait) {
253         AutoLock lock(stateMutex_);
254         replyCond_.Wait(lock, [this, time] { return jobQueue_.find(time) == jobQueue_.end(); });
255     }
256 }
257 
UpdateTop()258 void TaskInner::UpdateTop()
259 {
260     // jobQueue_ is only handled in STARTED state, msgQueue_ always got handled.
261     if (msgQueue_.empty() && ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty())) {
262         topProcessUs_ = -1;
263         return;
264     }
265     if (msgQueue_.empty()) {
266         topProcessUs_ = jobQueue_.begin()->first;
267         topIsJob_ = true;
268     } else if ((runningState_.load() != RunningState::STARTED) || jobQueue_.empty()) {
269         topProcessUs_ = msgQueue_.begin()->first;
270         topIsJob_ = false;
271     } else {
272         int64_t msgProcessTime = msgQueue_.begin()->first;
273         int64_t jobProcessTime = jobQueue_.begin()->first;
274         int64_t nowUs =  GetNowUs();
275         if (msgProcessTime <= nowUs || msgProcessTime <= jobProcessTime) {
276             topProcessUs_ = msgProcessTime;
277             topIsJob_ = false;
278         } else  {
279             topProcessUs_ = jobProcessTime;
280             topIsJob_ = true;
281         }
282     }
283 }
284 
NextJobUs()285 int64_t TaskInner::NextJobUs()
286 {
287     AutoLock lock(stateMutex_);
288     return topProcessUs_;
289 }
290 
HandleJob()291 void TaskInner::HandleJob()
292 {
293     AutoLock lock(jobMutex_);
294     if (singleLoop_) {
295         stateMutex_.lock();
296         int64_t currentTopProcessUs = topProcessUs_;
297         if (runningState_.load() == RunningState::PAUSED || runningState_.load() == RunningState::STOPPED) {
298             topProcessUs_ = -1;
299             stateMutex_.unlock();
300             return;
301         }
302         // unlock stateMutex otherwise pauseAsync/stopAsync function will wait job finish.
303         stateMutex_.unlock();
304         int64_t nextDelay = (!job_) ? INVALID_DELAY_TIME_US : job_();
305 
306         AutoLock lock(stateMutex_);
307         // if topProcessUs_ is -1, we already pause/stop in job_()
308         // if topProcessUs_ is changed, we should ignore the returned delay time.
309         if (topProcessUs_ != -1 && currentTopProcessUs == topProcessUs_) {
310             topProcessUs_ = GetNowUs() + nextDelay;
311         }
312     } else {
313         std::function<void()> nextJob;
314         stateMutex_.lock();
315         if (topIsJob_) {
316             nextJob = std::move(jobQueue_.begin()->second);
317             jobQueue_.erase(jobQueue_.begin());
318         } else {
319             nextJob = std::move(msgQueue_.begin()->second);
320             msgQueue_.erase(msgQueue_.begin());
321         }
322         {
323 			// unlock stateMutex otherwise pauseAsync/stopAsync function will wait job finish.
324             stateMutex_.unlock();
325             nextJob();
326             replyCond_.NotifyAll();
327         }
328         AutoLock lock(stateMutex_);
329         UpdateTop();
330     }
331 }
332 
UpdateThreadPriority(const uint32_t newPriority,const std::string & strBundleName)333 void TaskInner::UpdateThreadPriority(const uint32_t newPriority, const std::string &strBundleName)
334 {
335     FALSE_RETURN(pipelineThread_ != nullptr);
336     bool tmpFlag = singleLoop_;
337     singleLoop_ = false;
338     SubmitJobOnce(
339         [this, newPriority, strBundleName] { pipelineThread_->UpdateThreadPriority(newPriority, strBundleName); },
340         0, true);
341     singleLoop_ = tmpFlag;
342 }
343 
InsertJob(const std::function<void ()> & job,int64_t delayUs,bool inJobQueue)344 int64_t TaskInner::InsertJob(const std::function<void()>& job, int64_t delayUs, bool inJobQueue)
345 {
346     pipelineThread_->LockJobState();
347     AutoLock lock(stateMutex_);
348     int64_t nowUs = GetNowUs();
349     if (delayUs < 0) {
350         delayUs = 0;
351     }
352     int64_t processTime = nowUs + delayUs;
353     if (inJobQueue) {
354         while (jobQueue_.find(processTime) != jobQueue_.end()) { // To prevent dropping job unexpectedly
355             MEDIA_LOG_W_T("DUPLICATIVE jobQueue_ TIMESTAMP!!!");
356             processTime++;
357         }
358         jobQueue_[processTime] = std::move(job);
359     } else {
360         while (msgQueue_.find(processTime) != msgQueue_.end()) { // To prevent dropping job unexpectedly
361             MEDIA_LOG_W_T("DUPLICATIVE msgQueue_ TIMESTAMP!!!");
362             processTime++;
363         }
364         msgQueue_[processTime] = std::move(job);
365     }
366     int64_t lastProcessUs = topProcessUs_;
367     // update top if only new job is more emgercy or jobqueue is empty
368     if (processTime <= topProcessUs_ || topProcessUs_ == -1) {
369         UpdateTop();
370     }
371     // if top is updated we should wake pipeline thread
372     pipelineThread_->UnLockJobState(lastProcessUs != topProcessUs_);
373     return processTime;
374 }
375 } // namespace Media
376 } // namespace OHOS
377