1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "pipeline/include/pipeline.h"
17
18 #include <queue>
19 #include <stack>
20
21 #include "osal/task/autolock.h"
22 #include "osal/task/jobutils.h"
23
24 #include "av_trans_log.h"
25
26 #undef DH_LOG_TAG
27 #define DH_LOG_TAG "Pipeline"
28
29 namespace OHOS {
30 namespace DistributedHardware {
31 namespace Pipeline {
32 static std::atomic<uint16_t> pipeLineId = 0;
33
GetNextPipelineId()34 int32_t Pipeline::GetNextPipelineId()
35 {
36 return pipeLineId++;
37 }
38
~Pipeline()39 Pipeline::~Pipeline()
40 {
41 }
42
Init(const std::shared_ptr<EventReceiver> & receiver,const std::shared_ptr<FilterCallback> & callback,const std::string & groupId)43 void Pipeline::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback,
44 const std::string& groupId)
45 {
46 AVTRANS_LOGI("Pipeline::Init");
47 eventReceiver_ = receiver;
48 filterCallback_ = callback;
49 groupId_ = groupId;
50 }
51
Prepare()52 Status Pipeline::Prepare()
53 {
54 AVTRANS_LOGI("Prepare enter.");
55 Status ret = Status::OK;
56 Media::SubmitJobOnce([&] {
57 Media::AutoLock lock(mutex_);
58 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
59 if (*it == nullptr) {
60 continue;
61 }
62 ret = (*it)->Prepare();
63 if (ret != Status::OK) {
64 return;
65 }
66 }
67 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
68 if (*it == nullptr) {
69 continue;
70 }
71 ret = (*it)->WaitAllState(FilterState::READY);
72 if (ret != Status::OK) {
73 return;
74 }
75 }
76 });
77 AVTRANS_LOGI("Prepare done ret = %{public}d", ret);
78 return ret;
79 }
80
Start()81 Status Pipeline::Start()
82 {
83 AVTRANS_LOGI("Start enter.");
84 Status ret = Status::OK;
85 Media::SubmitJobOnce([&] {
86 Media::AutoLock lock(mutex_);
87 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
88 if (*it == nullptr) {
89 continue;
90 }
91 ret = (*it)->Start();
92 if (ret != Status::OK) {
93 return;
94 }
95 }
96 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
97 if (*it == nullptr) {
98 continue;
99 }
100 ret = (*it)->WaitAllState(FilterState::RUNNING);
101 if (ret != Status::OK) {
102 return;
103 }
104 }
105 });
106 AVTRANS_LOGI("Start done ret = %{public}d", ret);
107 return ret;
108 }
109
Pause()110 Status Pipeline::Pause()
111 {
112 AVTRANS_LOGI("Pause enter.");
113 Status ret = Status::OK;
114 Media::SubmitJobOnce([&] {
115 Media::AutoLock lock(mutex_);
116 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
117 if (*it == nullptr) {
118 continue;
119 }
120 auto rtv = (*it)->Pause();
121 if (rtv != Status::OK) {
122 ret = rtv;
123 }
124 }
125 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
126 if (*it == nullptr) {
127 continue;
128 }
129 auto rtv = (*it)->WaitAllState(FilterState::PAUSED);
130 if (rtv != Status::OK) {
131 ret = rtv;
132 }
133 }
134 });
135 AVTRANS_LOGI("Pause done ret = %{public}d", ret);
136 return ret;
137 }
138
Resume()139 Status Pipeline::Resume()
140 {
141 AVTRANS_LOGI("Resume enter.");
142 Status ret = Status::OK;
143 Media::SubmitJobOnce([&] {
144 Media::AutoLock lock(mutex_);
145 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
146 if (*it == nullptr) {
147 continue;
148 }
149 ret = (*it)->Resume();
150 if (ret != Status::OK) {
151 return;
152 }
153 }
154 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
155 if (*it == nullptr) {
156 continue;
157 }
158 ret = (*it)->WaitAllState(FilterState::RUNNING);
159 if (ret != Status::OK) {
160 return;
161 }
162 }
163 });
164 AVTRANS_LOGI("Resume done ret = %{public}d", ret);
165 return ret;
166 }
167
Stop()168 Status Pipeline::Stop()
169 {
170 AVTRANS_LOGI("Stop enter.");
171 Status ret = Status::OK;
172 Media::SubmitJobOnce([&] {
173 Media::AutoLock lock(mutex_);
174 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
175 if (*it == nullptr) {
176 AVTRANS_LOGE("Pipeline error: %{public}zu", filters_.size());
177 continue;
178 }
179 auto rtv = (*it)->Stop();
180 if (rtv != Status::OK) {
181 ret = rtv;
182 }
183 }
184 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
185 if (*it == nullptr) {
186 continue;
187 }
188 auto rtv = (*it)->WaitAllState(FilterState::STOPPED);
189 if (rtv != Status::OK) {
190 ret = rtv;
191 }
192 }
193 filters_.clear();
194 });
195 AVTRANS_LOGI("Stop done ret = %{public}d", ret);
196 return ret;
197 }
198
Flush()199 Status Pipeline::Flush()
200 {
201 AVTRANS_LOGI("Flush enter.");
202 Media::SubmitJobOnce([&] {
203 Media::AutoLock lock(mutex_);
204 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
205 if (*it == nullptr) {
206 continue;
207 }
208 (*it)->Flush();
209 }
210 });
211 AVTRANS_LOGI("Flush end.");
212 return Status::OK;
213 }
214
Release()215 Status Pipeline::Release()
216 {
217 AVTRANS_LOGI("Release enter.");
218 Media::SubmitJobOnce([&] {
219 Media::AutoLock lock(mutex_);
220 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
221 if (*it == nullptr) {
222 continue;
223 }
224 (*it)->Release();
225 }
226 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
227 if (*it == nullptr) {
228 continue;
229 }
230 (*it)->WaitAllState(FilterState::RELEASED);
231 }
232 filters_.clear();
233 });
234 AVTRANS_LOGI("Release done.");
235 return Status::OK;
236 }
237
Preroll(bool render)238 Status Pipeline::Preroll(bool render)
239 {
240 AVTRANS_LOGI("Preroll enter.");
241 Status ret = Status::OK;
242 Media::AutoLock lock(mutex_);
243 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
244 if (*it == nullptr) {
245 continue;
246 }
247 auto rtv = (*it)->Preroll();
248 if (rtv != Status::OK) {
249 ret = rtv;
250 AVTRANS_LOGI("Preroll done ret = %{public}d", ret);
251 return ret;
252 }
253 }
254 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
255 if (*it == nullptr) {
256 continue;
257 }
258 auto rtv = (*it)->WaitPrerollDone(render);
259 if (rtv != Status::OK) {
260 ret = rtv;
261 AVTRANS_LOGI("Preroll done ret = %{public}d", ret);
262 return ret;
263 }
264 }
265 AVTRANS_LOGI("Preroll done ret = %{public}d", ret);
266 return ret;
267 }
268
SetPlayRange(int64_t start,int64_t end)269 Status Pipeline::SetPlayRange(int64_t start, int64_t end)
270 {
271 AVTRANS_LOGI("SetPlayRange enter.");
272 Media::SubmitJobOnce([&] {
273 Media::AutoLock lock(mutex_);
274 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
275 if (*it == nullptr) {
276 continue;
277 }
278 (*it)->SetPlayRange(start, end);
279 }
280 });
281 AVTRANS_LOGI("SetPlayRange done.");
282 return Status::OK;
283 }
284
AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)285 Status Pipeline::AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)
286 {
287 AVTRANS_LOGI("AddHeadFilters enter.");
288 std::vector<std::shared_ptr<Filter>> filtersToAdd;
289 for (auto& filterIn : filtersIn) {
290 if (filterIn == nullptr) {
291 continue;
292 }
293 bool matched = false;
294 for (const auto& filter : filters_) {
295 if (filterIn == filter) {
296 matched = true;
297 break;
298 }
299 }
300 if (!matched) {
301 filtersToAdd.push_back(filterIn);
302 filterIn->LinkPipeLine(groupId_);
303 }
304 }
305 if (filtersToAdd.empty()) {
306 AVTRANS_LOGI("filter already exists");
307 return Status::OK;
308 }
309 Media::SubmitJobOnce([&] {
310 Media::AutoLock lock(mutex_);
311 this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end());
312 });
313 AVTRANS_LOGI("AddHeadFilters done.");
314 return Status::OK;
315 }
316
RemoveHeadFilter(const std::shared_ptr<Filter> & filter)317 Status Pipeline::RemoveHeadFilter(const std::shared_ptr<Filter>& filter)
318 {
319 Media::SubmitJobOnce([&] {
320 Media::AutoLock lock(mutex_);
321 auto it = std::find_if(filters_.begin(), filters_.end(),
322 [&filter](const std::shared_ptr<Filter>& filterPtr) { return filterPtr == filter; });
323 if (it != filters_.end()) {
324 filters_.erase(it);
325 }
326 if (filter != nullptr) {
327 filter->Release();
328 filter->WaitAllState(FilterState::RELEASED);
329 filter->ClearAllNextFilters();
330 }
331 return Status::OK;
332 });
333 return Status::OK;
334 }
335
LinkFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)336 Status Pipeline::LinkFilters(const std::shared_ptr<Filter> &preFilter,
337 const std::vector<std::shared_ptr<Filter>> &nextFilters,
338 StreamType type)
339 {
340 TRUE_RETURN_V(preFilter == nullptr, Status::ERROR_NULL_POINTER);
341 for (auto nextFilter : nextFilters) {
342 TRUE_RETURN_V(nextFilter == nullptr, Status::ERROR_NULL_POINTER);
343 auto ret = preFilter->LinkNext(nextFilter, type);
344 nextFilter->LinkPipeLine(groupId_);
345 TRUE_RETURN_V(ret != Status::OK, ret);
346 }
347 return Status::OK;
348 }
349
UpdateFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)350 Status Pipeline::UpdateFilters(const std::shared_ptr<Filter> &preFilter,
351 const std::vector<std::shared_ptr<Filter>> &nextFilters,
352 StreamType type)
353 {
354 TRUE_RETURN_V(preFilter == nullptr, Status::ERROR_NULL_POINTER);
355 for (auto nextFilter : nextFilters) {
356 preFilter->UpdateNext(nextFilter, type);
357 }
358 return Status::OK;
359 }
360
UnLinkFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)361 Status Pipeline::UnLinkFilters(const std::shared_ptr<Filter> &preFilter,
362 const std::vector<std::shared_ptr<Filter>> &nextFilters,
363 StreamType type)
364 {
365 TRUE_RETURN_V(preFilter == nullptr, Status::ERROR_NULL_POINTER);
366 for (auto nextFilter : nextFilters) {
367 preFilter->UnLinkNext(nextFilter, type);
368 }
369 return Status::OK;
370 }
371
OnEvent(const Event & event)372 void Pipeline::OnEvent(const Event& event)
373 {
374 }
375 } // namespace Pipeline
376 } // namespace DistributedHardware
377 } // namespace OHOS
378