• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "pipeline/include/pipeline.h"
17 
18 #include <queue>
19 #include <stack>
20 
21 #include "osal/task/autolock.h"
22 #include "osal/task/jobutils.h"
23 
24 #include "av_trans_log.h"
25 
26 #undef DH_LOG_TAG
27 #define DH_LOG_TAG "Pipeline"
28 
29 namespace OHOS {
30 namespace DistributedHardware {
31 namespace Pipeline {
32 static std::atomic<uint16_t> pipeLineId = 0;
33 
GetNextPipelineId()34 int32_t Pipeline::GetNextPipelineId()
35 {
36     return pipeLineId++;
37 }
38 
~Pipeline()39 Pipeline::~Pipeline()
40 {
41 }
42 
Init(const std::shared_ptr<EventReceiver> & receiver,const std::shared_ptr<FilterCallback> & callback,const std::string & groupId)43 void Pipeline::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback,
44     const std::string& groupId)
45 {
46     AVTRANS_LOGI("Pipeline::Init");
47     eventReceiver_ = receiver;
48     filterCallback_ = callback;
49     groupId_ = groupId;
50 }
51 
Prepare()52 Status Pipeline::Prepare()
53 {
54     AVTRANS_LOGI("Prepare enter.");
55     Status ret = Status::OK;
56     Media::SubmitJobOnce([&] {
57         Media::AutoLock lock(mutex_);
58         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
59             if (*it == nullptr) {
60                 continue;
61             }
62             ret = (*it)->Prepare();
63             if (ret != Status::OK) {
64                 return;
65             }
66         }
67         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
68             if (*it == nullptr) {
69                 continue;
70             }
71             ret = (*it)->WaitAllState(FilterState::READY);
72             if (ret != Status::OK) {
73                 return;
74             }
75         }
76     });
77     AVTRANS_LOGI("Prepare done ret = %{public}d", ret);
78     return ret;
79 }
80 
Start()81 Status Pipeline::Start()
82 {
83     AVTRANS_LOGI("Start enter.");
84     Status ret = Status::OK;
85     Media::SubmitJobOnce([&] {
86         Media::AutoLock lock(mutex_);
87         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
88             if (*it == nullptr) {
89                 continue;
90             }
91             ret = (*it)->Start();
92             if (ret != Status::OK) {
93                 return;
94             }
95         }
96         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
97             if (*it == nullptr) {
98                 continue;
99             }
100             ret = (*it)->WaitAllState(FilterState::RUNNING);
101             if (ret != Status::OK) {
102                 return;
103             }
104         }
105     });
106     AVTRANS_LOGI("Start done ret = %{public}d", ret);
107     return ret;
108 }
109 
Pause()110 Status Pipeline::Pause()
111 {
112     AVTRANS_LOGI("Pause enter.");
113     Status ret = Status::OK;
114     Media::SubmitJobOnce([&] {
115         Media::AutoLock lock(mutex_);
116         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
117             if (*it == nullptr) {
118                 continue;
119             }
120             auto rtv = (*it)->Pause();
121             if (rtv != Status::OK) {
122                 ret = rtv;
123             }
124         }
125         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
126             if (*it == nullptr) {
127                 continue;
128             }
129             auto rtv = (*it)->WaitAllState(FilterState::PAUSED);
130             if (rtv != Status::OK) {
131                 ret = rtv;
132             }
133         }
134     });
135     AVTRANS_LOGI("Pause done ret = %{public}d", ret);
136     return ret;
137 }
138 
Resume()139 Status Pipeline::Resume()
140 {
141     AVTRANS_LOGI("Resume enter.");
142     Status ret = Status::OK;
143     Media::SubmitJobOnce([&] {
144         Media::AutoLock lock(mutex_);
145         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
146             if (*it == nullptr) {
147                 continue;
148             }
149             ret = (*it)->Resume();
150             if (ret != Status::OK) {
151                 return;
152             }
153         }
154         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
155             if (*it == nullptr) {
156                 continue;
157             }
158             ret = (*it)->WaitAllState(FilterState::RUNNING);
159             if (ret != Status::OK) {
160                 return;
161             }
162         }
163     });
164     AVTRANS_LOGI("Resume done ret = %{public}d", ret);
165     return ret;
166 }
167 
Stop()168 Status Pipeline::Stop()
169 {
170     AVTRANS_LOGI("Stop enter.");
171     Status ret = Status::OK;
172     Media::SubmitJobOnce([&] {
173         Media::AutoLock lock(mutex_);
174         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
175             if (*it == nullptr) {
176                 AVTRANS_LOGE("Pipeline error: %{public}zu", filters_.size());
177                 continue;
178             }
179             auto rtv = (*it)->Stop();
180             if (rtv != Status::OK) {
181                 ret = rtv;
182             }
183         }
184         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
185             if (*it == nullptr) {
186                 continue;
187             }
188             auto rtv = (*it)->WaitAllState(FilterState::STOPPED);
189             if (rtv != Status::OK) {
190                 ret = rtv;
191             }
192         }
193         filters_.clear();
194     });
195     AVTRANS_LOGI("Stop done ret = %{public}d", ret);
196     return ret;
197 }
198 
Flush()199 Status Pipeline::Flush()
200 {
201     AVTRANS_LOGI("Flush enter.");
202     Media::SubmitJobOnce([&] {
203         Media::AutoLock lock(mutex_);
204         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
205             if (*it == nullptr) {
206                 continue;
207             }
208             (*it)->Flush();
209         }
210     });
211     AVTRANS_LOGI("Flush end.");
212     return Status::OK;
213 }
214 
Release()215 Status Pipeline::Release()
216 {
217     AVTRANS_LOGI("Release enter.");
218     Media::SubmitJobOnce([&] {
219         Media::AutoLock lock(mutex_);
220         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
221             if (*it == nullptr) {
222                 continue;
223             }
224             (*it)->Release();
225         }
226         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
227             if (*it == nullptr) {
228                 continue;
229             }
230             (*it)->WaitAllState(FilterState::RELEASED);
231         }
232         filters_.clear();
233     });
234     AVTRANS_LOGI("Release done.");
235     return Status::OK;
236 }
237 
Preroll(bool render)238 Status Pipeline::Preroll(bool render)
239 {
240     AVTRANS_LOGI("Preroll enter.");
241     Status ret = Status::OK;
242     Media::AutoLock lock(mutex_);
243     for (auto it = filters_.begin(); it != filters_.end(); ++it) {
244         if (*it == nullptr) {
245             continue;
246         }
247         auto rtv = (*it)->Preroll();
248         if (rtv != Status::OK) {
249             ret = rtv;
250             AVTRANS_LOGI("Preroll done ret = %{public}d", ret);
251             return ret;
252         }
253     }
254     for (auto it = filters_.begin(); it != filters_.end(); ++it) {
255         if (*it == nullptr) {
256             continue;
257         }
258         auto rtv = (*it)->WaitPrerollDone(render);
259         if (rtv != Status::OK) {
260             ret = rtv;
261             AVTRANS_LOGI("Preroll done ret = %{public}d", ret);
262             return ret;
263         }
264     }
265     AVTRANS_LOGI("Preroll done ret = %{public}d", ret);
266     return ret;
267 }
268 
SetPlayRange(int64_t start,int64_t end)269 Status Pipeline::SetPlayRange(int64_t start, int64_t end)
270 {
271     AVTRANS_LOGI("SetPlayRange enter.");
272     Media::SubmitJobOnce([&] {
273         Media::AutoLock lock(mutex_);
274         for (auto it = filters_.begin(); it != filters_.end(); ++it) {
275             if (*it == nullptr) {
276                 continue;
277             }
278             (*it)->SetPlayRange(start, end);
279         }
280     });
281     AVTRANS_LOGI("SetPlayRange done.");
282     return Status::OK;
283 }
284 
AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)285 Status Pipeline::AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)
286 {
287     AVTRANS_LOGI("AddHeadFilters enter.");
288     std::vector<std::shared_ptr<Filter>> filtersToAdd;
289     for (auto& filterIn : filtersIn) {
290         if (filterIn == nullptr) {
291             continue;
292         }
293         bool matched = false;
294         for (const auto& filter : filters_) {
295             if (filterIn == filter) {
296                 matched = true;
297                 break;
298             }
299         }
300         if (!matched) {
301             filtersToAdd.push_back(filterIn);
302             filterIn->LinkPipeLine(groupId_);
303         }
304     }
305     if (filtersToAdd.empty()) {
306         AVTRANS_LOGI("filter already exists");
307         return Status::OK;
308     }
309     Media::SubmitJobOnce([&] {
310         Media::AutoLock lock(mutex_);
311         this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end());
312     });
313     AVTRANS_LOGI("AddHeadFilters done.");
314     return Status::OK;
315 }
316 
RemoveHeadFilter(const std::shared_ptr<Filter> & filter)317 Status Pipeline::RemoveHeadFilter(const std::shared_ptr<Filter>& filter)
318 {
319     Media::SubmitJobOnce([&] {
320         Media::AutoLock lock(mutex_);
321         auto it = std::find_if(filters_.begin(), filters_.end(),
322                                [&filter](const std::shared_ptr<Filter>& filterPtr) { return filterPtr == filter; });
323         if (it != filters_.end()) {
324             filters_.erase(it);
325         }
326         if (filter != nullptr) {
327             filter->Release();
328             filter->WaitAllState(FilterState::RELEASED);
329             filter->ClearAllNextFilters();
330         }
331         return Status::OK;
332     });
333     return Status::OK;
334 }
335 
LinkFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)336 Status Pipeline::LinkFilters(const std::shared_ptr<Filter> &preFilter,
337                              const std::vector<std::shared_ptr<Filter>> &nextFilters,
338                              StreamType type)
339 {
340     TRUE_RETURN_V(preFilter == nullptr, Status::ERROR_NULL_POINTER);
341     for (auto nextFilter : nextFilters) {
342         TRUE_RETURN_V(nextFilter == nullptr, Status::ERROR_NULL_POINTER);
343         auto ret = preFilter->LinkNext(nextFilter, type);
344         nextFilter->LinkPipeLine(groupId_);
345         TRUE_RETURN_V(ret != Status::OK, ret);
346     }
347     return Status::OK;
348 }
349 
UpdateFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)350 Status Pipeline::UpdateFilters(const std::shared_ptr<Filter> &preFilter,
351                                const std::vector<std::shared_ptr<Filter>> &nextFilters,
352                                StreamType type)
353 {
354     TRUE_RETURN_V(preFilter == nullptr, Status::ERROR_NULL_POINTER);
355     for (auto nextFilter : nextFilters) {
356         preFilter->UpdateNext(nextFilter, type);
357     }
358     return Status::OK;
359 }
360 
UnLinkFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)361 Status Pipeline::UnLinkFilters(const std::shared_ptr<Filter> &preFilter,
362                                const std::vector<std::shared_ptr<Filter>> &nextFilters,
363                                StreamType type)
364 {
365     TRUE_RETURN_V(preFilter == nullptr, Status::ERROR_NULL_POINTER);
366     for (auto nextFilter : nextFilters) {
367         preFilter->UnLinkNext(nextFilter, type);
368     }
369     return Status::OK;
370 }
371 
OnEvent(const Event & event)372 void Pipeline::OnEvent(const Event& event)
373 {
374 }
375 } // namespace Pipeline
376 } // namespace DistributedHardware
377 } // namespace OHOS
378