• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #define HST_LOG_TAG "Task"
16 #include "osal/task/task.h"
17 #include "osal/task/taskInner.h"
18 #include "osal/task/thread.h"
19 #include "osal/utils/util.h"
20 #include "cpp_ext/memory_ext.h"
21 #include "common/log.h"
22 
23 #include <mutex>
24 
25 namespace {
26 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "PipelineTreadPool" };
27 }
28 
29 namespace OHOS {
30 namespace Media {
31 namespace {
32     constexpr int64_t ADJUST_US = 500;
33     constexpr int64_t US_PER_MS = 1000;
34 }
35 
ConvertPriorityType(TaskPriority priority)36 static ThreadPriority ConvertPriorityType(TaskPriority priority)
37 {
38     switch (priority) {
39         case TaskPriority::LOW:
40             return ThreadPriority::LOW;
41         case TaskPriority::NORMAL:
42             return ThreadPriority::NORMAL;
43         case TaskPriority::MIDDLE:
44             return ThreadPriority::MIDDLE;
45         case TaskPriority::HIGHEST:
46             return ThreadPriority::HIGHEST;
47         default:
48             return ThreadPriority::NORMAL;
49     }
50 }
51 
TaskTypeConvert(TaskType type)52 static std::string TaskTypeConvert(TaskType type)
53 {
54     static const std::map<TaskType, std::string> table = {
55         {TaskType::GLOBAL, "G"},
56         {TaskType::VIDEO, "V"},
57         {TaskType::AUDIO, "A"},
58         {TaskType::DEMUXER, "DEM"},
59         {TaskType::DECODER, "DEC"},
60         {TaskType::SUBTITLE, "T"},
61         {TaskType::SINGLETON, "S"},
62     };
63     auto it = table.find(type);
64     if (it != table.end()) {
65         return it->second;
66     }
67     return "NA";
68 }
69 
GetNowUs()70 static int64_t GetNowUs()
71 {
72     auto now = std::chrono::steady_clock::now();
73     return std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count();
74 }
75 
GetInstance()76 PipeLineThreadPool& PipeLineThreadPool::GetInstance()
77 {
78     static PipeLineThreadPool instance;
79     return instance;
80 }
81 
~PipeLineThreadPool()82 PipeLineThreadPool::~PipeLineThreadPool()
83 {
84     std::map<std::string, std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>>> tempMap;
85     {
86         std::lock_guard<Mutex> lock(mutex_);
87         std::swap(tempMap, workerGroupMap);
88     }
89     tempMap.clear();
90 }
91 
FindThread(const std::string & groupId,TaskType taskType,TaskPriority priority)92 std::shared_ptr<PipeLineThread> PipeLineThreadPool::FindThread(const std::string &groupId,
93     TaskType taskType, TaskPriority priority)
94 {
95     AutoLock lock(mutex_);
96     if (workerGroupMap.find(groupId) == workerGroupMap.end()) {
97         workerGroupMap[groupId] = std::make_shared<std::list<std::shared_ptr<PipeLineThread>>>();
98     }
99     std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList = workerGroupMap[groupId];
100     for (auto thread : *threadList.get()) {
101         if (thread->type_ == taskType) {
102             return thread;
103         }
104     }
105     std::shared_ptr<PipeLineThread> newThread = std::make_shared<PipeLineThread>(groupId, taskType, priority);
106     threadList->push_back(newThread);
107     return newThread;
108 }
109 
DestroyThread(const std::string & groupId)110 void PipeLineThreadPool::DestroyThread(const std::string &groupId)
111 {
112     MEDIA_LOG_I("DestroyThread groupId:" PUBLIC_LOG_S, groupId.c_str());
113     std::shared_ptr<std::list<std::shared_ptr<PipeLineThread>>> threadList;
114     {
115         AutoLock lock(mutex_);
116         if (workerGroupMap.find(groupId) == workerGroupMap.end()) {
117             MEDIA_LOG_E("DestroyThread groupId not exist");
118             return;
119         }
120         threadList = workerGroupMap[groupId];
121         workerGroupMap.erase(groupId);
122     }
123     for (auto thread : *threadList.get()) {
124         thread->Exit();
125     }
126 }
127 
PipeLineThread(std::string groupId,TaskType type,TaskPriority priority)128 PipeLineThread::PipeLineThread(std::string groupId, TaskType type, TaskPriority priority)
129     : groupId_(groupId), type_(type)
130 {
131     MEDIA_LOG_I("PipeLineThread groupId:" PUBLIC_LOG_S " type:%{public}d created call", groupId_.c_str(), type);
132     loop_ = CppExt::make_unique<Thread>(ConvertPriorityType(priority));
133     name_ = groupId_ + "_" + TaskTypeConvert(type);
134     loop_->SetName(name_);
135     threadExit_ = false;
136     if (loop_->CreateThread([this] { Run(); })) {
137         threadExit_ = false;
138     } else {
139         threadExit_ = true;
140         loop_ = nullptr;
141         MEDIA_LOG_E("PipeLineThread " PUBLIC_LOG_S " create failed", name_.c_str());
142     }
143 }
144 
UpdateThreadPriority(const uint32_t newPriority,const std::string & strBundleName)145 void PipeLineThread::UpdateThreadPriority(const uint32_t newPriority, const std::string &strBundleName)
146 {
147     FALSE_RETURN_W(!threadExit_.load() && loop_);
148     loop_->UpdateThreadPriority(newPriority, strBundleName);
149 }
150 
~PipeLineThread()151 PipeLineThread::~PipeLineThread()
152 {
153     Exit();
154 }
155 
Exit()156 void PipeLineThread::Exit()
157 {
158     {
159         AutoLock lock(mutex_);
160         FALSE_RETURN_W(!threadExit_.load() && loop_);
161 
162         MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " exit", name_.c_str());
163         threadExit_ = true;
164         syncCond_.NotifyAll();
165 
166         // trigger to quit thread in current running thread, must not wait,
167         // or else the current thread will be suspended and can not quit.
168         if (IsRunningInSelf()) {
169             return;
170         }
171     }
172     // loop_ destroy will wait thread join
173     loop_ = nullptr;
174 }
175 
Run()176 void PipeLineThread::Run()
177 {
178     MEDIA_LOG_I("PipeLineThread " PUBLIC_LOG_S " run enter", name_.c_str());
179     while (true) {
180         std::shared_ptr<TaskInner> nextTask;
181         {
182             AutoLock lock(mutex_);
183             if (threadExit_.load()) {
184                 break;
185             }
186             int64_t nextJobUs = INT64_MAX;
187             for (auto task: taskList_) {
188                 int64_t taskJobUs = task->NextJobUs();
189                 if (taskJobUs == -1) {
190                     continue;
191                 }
192                 if (taskJobUs < nextJobUs) {
193                     nextJobUs = taskJobUs;
194                     nextTask = task;
195                 }
196             }
197             if (nextTask == nullptr) {
198                 syncCond_.Wait(lock);
199                 continue;
200             }
201             int64_t nowUs = GetNowUs();
202             if (nextJobUs > (nowUs + ADJUST_US)) {
203                 syncCond_.WaitFor(lock, (nextJobUs - nowUs + ADJUST_US) / US_PER_MS);
204                 continue;
205             }
206         }
207         nextTask->HandleJob();
208     }
209 }
210 
AddTask(std::shared_ptr<TaskInner> task)211 void PipeLineThread::AddTask(std::shared_ptr<TaskInner> task)
212 {
213     AutoLock lock(mutex_);
214     taskList_.push_back(task);
215 }
216 
RemoveTask(std::shared_ptr<TaskInner> task)217 void PipeLineThread::RemoveTask(std::shared_ptr<TaskInner> task)
218 {
219     {
220         AutoLock lock(mutex_);
221         taskList_.remove(task);
222         FALSE_LOG_MSG(!taskList_.empty(),
223          "PipeLineThread " PUBLIC_LOG_S " remove all Task", name_.c_str());
224     }
225     if (type_ == TaskType::SINGLETON) {
226         PipeLineThreadPool::GetInstance().DestroyThread(groupId_);
227     }
228 }
229 
LockJobState()230 void PipeLineThread::LockJobState()
231 {
232     if (IsRunningInSelf()) {
233         return;
234     }
235     mutex_.lock();
236 }
237 
UnLockJobState(bool notifyChange)238 void PipeLineThread::UnLockJobState(bool notifyChange)
239 {
240     if (IsRunningInSelf()) {
241         return;
242     }
243     mutex_.unlock();
244     if (notifyChange) {
245         syncCond_.NotifyAll();
246     }
247 }
248 
IsRunningInSelf()249 bool PipeLineThread::IsRunningInSelf()
250 {
251     return loop_ ? loop_->IsRunningInSelf() : false;
252 }
253 } // namespace Media
254 } // namespace OHOS
255