1 /*
2 * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #define HST_LOG_TAG "Task"
16 #include "osal/task/task.h"
17 #include "osal/task/taskInner.h"
18 #include "osal/task/thread.h"
19 #include "osal/utils/util.h"
20 #include "cpp_ext/memory_ext.h"
21 #include "common/log.h"
22
23 #include <mutex>
24
25 namespace {
26 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "PipelineTreadPool" };
27 }
28
29 namespace OHOS {
30 namespace Media {
31 namespace {
32 constexpr int64_t ADJUST_US = 500;
33 constexpr int64_t US_PER_MS = 1000;
34 }
35
ConvertPriorityType(TaskPriority priority)36 static ThreadPriority ConvertPriorityType(TaskPriority priority)
37 {
38 switch (priority) {
39 case TaskPriority::LOW:
40 return ThreadPriority::LOW;
41 case TaskPriority::NORMAL:
42 return ThreadPriority::NORMAL;
43 case TaskPriority::MIDDLE:
44 return ThreadPriority::MIDDLE;
45 case TaskPriority::HIGHEST:
46 return ThreadPriority::HIGHEST;
47 default:
48 return ThreadPriority::NORMAL;
49 }
50 }
51
TaskTypeConvert(TaskType type)52 static std::string TaskTypeConvert(TaskType type)
53 {
54 static const std::map<TaskType, std::string> table = {
55 {TaskType::GLOBAL, "G"},
56 {TaskType::VIDEO, "V"},
57 {TaskType::AUDIO, "A"},
58 {TaskType::DEMUXER, "DEM"},
59 {TaskType::DECODER, "DEC"},
60 {TaskType::SUBTITLE, "T"},
61 {TaskType::SINGLETON, "S"},
62 };
63 auto it = table.find(type);
64 if (it != table.end()) {
65 return it->second;
66 }
67 return "NA";
68 }
69
GetNowUs()70 static int64_t GetNowUs()
71 {
72 auto now = std::chrono::steady_clock::now();
73 return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
74 }
75
GetInstance()76 PipeLineThreadPool& PipeLineThreadPool::GetInstance()
77 {
78 static PipeLineThreadPool instance;
79 return instance;
80 }
81
FindThread(const std::string & groupId,TaskType taskType,TaskPriority priority)82 std::shared_ptr<PipeLineThread> PipeLineThreadPool::FindThread(const std::string &groupId,
83 TaskType taskType, TaskPriority priority)
84 {
85 AutoLock lock(mutex_);
86 if (workerGroupMap.find(groupId) == workerGroupMap.end()) {
87 workerGroupMap[groupId] = std::make_shared<std::list<std::shared_ptr<PipeLineThread>>>();
88 }
89 std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList = workerGroupMap[groupId];
90 for (auto thread : *threadList.get()) {
91 if (thread->type_ == taskType) {
92 return thread;
93 }
94 }
95 std::shared_ptr<PipeLineThread> newThread = std::make_shared<PipeLineThread>(groupId, taskType, priority);
96 threadList->push_back(newThread);
97 return newThread;
98 }
99
DestroyThread(const std::string & groupId)100 void PipeLineThreadPool::DestroyThread(const std::string &groupId)
101 {
102 MEDIA_LOG_I("DestroyThread groupId:" PUBLIC_LOG_S, groupId.c_str());
103 std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList;
104 {
105 AutoLock lock(mutex_);
106 if (workerGroupMap.find(groupId) == workerGroupMap.end()) {
107 MEDIA_LOG_E("DestroyThread groupId not exist");
108 return;
109 }
110 threadList = workerGroupMap[groupId];
111 workerGroupMap.erase(groupId);
112 }
113 for (auto thread : *threadList.get()) {
114 thread->Exit();
115 }
116 }
117
PipeLineThread(std::string groupId,TaskType type,TaskPriority priority)118 PipeLineThread::PipeLineThread(std::string groupId, TaskType type, TaskPriority priority)
119 : groupId_(groupId), type_(type)
120 {
121 MEDIA_LOG_I("PipeLineThread groupId:" PUBLIC_LOG_S " type:%{public}d created call", groupId_.c_str(), type);
122 loop_ = CppExt::make_unique<Thread>(ConvertPriorityType(priority));
123 name_ = groupId_ + "_" + TaskTypeConvert(type);
124 loop_->SetName(name_);
125 threadExit_ = false;
126 if (loop_->CreateThread([this] { Run(); })) {
127 threadExit_ = false;
128 } else {
129 threadExit_ = true;
130 loop_ = nullptr;
131 MEDIA_LOG_E("PipeLineThread " PUBLIC_LOG_S " create failed", name_.c_str());
132 }
133 }
134
UpdateThreadPriority(const uint32_t newPriority,const std::string & strBundleName)135 void PipeLineThread::UpdateThreadPriority(const uint32_t newPriority, const std::string &strBundleName)
136 {
137 FALSE_RETURN_W(!threadExit_.load() && loop_);
138 loop_->UpdateThreadPriority(newPriority, strBundleName);
139 }
140
~PipeLineThread()141 PipeLineThread::~PipeLineThread()
142 {
143 Exit();
144 }
145
Exit()146 void PipeLineThread::Exit()
147 {
148 {
149 AutoLock lock(mutex_);
150 FALSE_RETURN_W(!threadExit_.load() && loop_);
151
152 MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " exit", name_.c_str());
153 threadExit_ = true;
154 syncCond_.NotifyAll();
155
156 // trigger to quit thread in current running thread, must not wait,
157 // or else the current thread will be suspended and can not quit.
158 if (IsRunningInSelf()) {
159 return;
160 }
161 }
162 // loop_ destroy will wait thread join
163 loop_ = nullptr;
164 }
165
Run()166 void PipeLineThread::Run()
167 {
168 MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " run enter", name_.c_str());
169 while (true) {
170 std::shared_ptr<TaskInner> nextTask;
171 {
172 AutoLock lock(mutex_);
173 if (threadExit_.load()) {
174 break;
175 }
176 int64_t nextJobUs = INT64_MAX;
177 for (auto task: taskList_) {
178 int64_t taskJobUs = task->NextJobUs();
179 if (taskJobUs == -1) {
180 continue;
181 }
182 if (taskJobUs < nextJobUs) {
183 nextJobUs = taskJobUs;
184 nextTask = task;
185 }
186 }
187 if (nextTask == nullptr) {
188 syncCond_.Wait(lock);
189 continue;
190 }
191 int64_t nowUs = GetNowUs();
192 if (nextJobUs > (nowUs + ADJUST_US)) {
193 syncCond_.WaitFor(lock, (nextJobUs - nowUs + ADJUST_US) / US_PER_MS);
194 continue;
195 }
196 }
197 nextTask->HandleJob();
198 }
199 }
200
AddTask(std::shared_ptr<TaskInner> task)201 void PipeLineThread::AddTask(std::shared_ptr<TaskInner> task)
202 {
203 AutoLock lock(mutex_);
204 taskList_.push_back(task);
205 }
206
RemoveTask(std::shared_ptr<TaskInner> task)207 void PipeLineThread::RemoveTask(std::shared_ptr<TaskInner> task)
208 {
209 {
210 AutoLock lock(mutex_);
211 taskList_.remove(task);
212 FALSE_LOG_MSG(!taskList_.empty(),
213 "PipeLineThread " PUBLIC_LOG_S " remove all Task", name_.c_str());
214 }
215 if (type_ == TaskType::SINGLETON) {
216 PipeLineThreadPool::GetInstance().DestroyThread(groupId_);
217 }
218 }
219
LockJobState()220 void PipeLineThread::LockJobState()
221 {
222 if (IsRunningInSelf()) {
223 return;
224 }
225 mutex_.lock();
226 }
227
UnLockJobState(bool notifyChange)228 void PipeLineThread::UnLockJobState(bool notifyChange)
229 {
230 if (IsRunningInSelf()) {
231 return;
232 }
233 mutex_.unlock();
234 if (notifyChange) {
235 syncCond_.NotifyAll();
236 }
237 }
238
IsRunningInSelf()239 bool PipeLineThread::IsRunningInSelf()
240 {
241 return loop_ ? loop_->IsRunningInSelf() : false;
242 }
243 } // namespace Media
244 } // namespace OHOS
245