• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "PipelineCore"
17 
18 #include "pipeline/core/pipeline_core.h"
19 #include <queue>
20 #include <stack>
21 #include "foundation/log.h"
22 #include "foundation/osal/thread/scoped_lock.h"
23 #include "foundation/utils/hitrace_utils.h"
24 #include "foundation/utils/steady_clock.h"
25 
26 namespace OHOS {
27 namespace Media {
28 namespace Pipeline {
Init(EventReceiver * receiver,FilterCallback * callback)29 void PipelineCore::Init(EventReceiver* receiver, FilterCallback* callback)
30 {
31     eventReceiver_ = receiver;
32     filterCallback_ = callback;
33     state_ = FilterState::INITIALIZED;
34 }
35 
Prepare()36 ErrorCode PipelineCore::Prepare()
37 {
38     state_ = FilterState::PREPARING;
39     ErrorCode rtv = ErrorCode::SUCCESS;
40     OSAL::ScopedLock lock(mutex_);
41     ReorderFilters();
42     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
43         auto& filterPtr = *it;
44         if (filterPtr) {
45             if ((rtv = filterPtr->Prepare()) != ErrorCode::SUCCESS) {
46                 break;
47             }
48         } else {
49             MEDIA_LOG_E("invalid pointer in filters.");
50         }
51     }
52     return rtv;
53 }
54 
Start()55 ErrorCode PipelineCore::Start()
56 {
57     state_ = FilterState::RUNNING;
58     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
59         auto rtv = (*it)->Start();
60         FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
61     }
62     return ErrorCode::SUCCESS;
63 }
64 
Pause()65 ErrorCode PipelineCore::Pause()
66 {
67     if (state_ == FilterState::PAUSED) {
68         return ErrorCode::SUCCESS;
69     }
70     if (state_ != FilterState::READY && state_ != FilterState::RUNNING) {
71         return ErrorCode::ERROR_INVALID_OPERATION;
72     }
73     state_ = FilterState::PAUSED;
74     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
75         if ((*it)->Pause() != ErrorCode::SUCCESS) {
76             MEDIA_LOG_I("pause filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
77         }
78     }
79     return ErrorCode::SUCCESS;
80 }
81 
Resume()82 ErrorCode PipelineCore::Resume()
83 {
84     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
85         MEDIA_LOG_I("Resume filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
86         auto rtv = (*it)->Resume();
87         FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
88     }
89     state_ = FilterState::RUNNING;
90     return ErrorCode::SUCCESS;
91 }
92 
Stop()93 ErrorCode PipelineCore::Stop()
94 {
95     readyEventCnt_ = 0;
96     state_ = FilterState::INITIALIZED;
97     filtersToRemove_.clear();
98     filtersToRemove_.reserve(filters_.size());
99     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
100         if (*it == nullptr) {
101             MEDIA_LOG_E("PipelineCore error: " PUBLIC_LOG_ZU, filters_.size());
102             continue;
103         }
104         MEDIA_LOG_I("Stop filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
105         PROFILE_BEGIN();
106         auto rtv = (*it)->Stop();
107         PROFILE_END("Stop finished for %s", (*it)->GetName().c_str());
108         FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
109     }
110     for (const auto& filter : filtersToRemove_) {
111         RemoveFilter(filter);
112     }
113     MEDIA_LOG_I("Stop finished, filter number: " PUBLIC_LOG_ZU, filters_.size());
114     return ErrorCode::SUCCESS;
115 }
116 
FlushStart()117 void PipelineCore::FlushStart()
118 {
119     SYNC_TRACER();
120     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
121         MEDIA_LOG_I("FlushStart for filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
122         (*it)->FlushStart();
123     }
124 }
125 
FlushEnd()126 void PipelineCore::FlushEnd()
127 {
128     SYNC_TRACER();
129     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
130         MEDIA_LOG_I("FlushEnd for filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
131         (*it)->FlushEnd();
132     }
133 }
134 
GetState()135 FilterState PipelineCore::GetState()
136 {
137     return state_;
138 }
139 
AddFilters(std::initializer_list<Filter * > filtersIn)140 ErrorCode PipelineCore::AddFilters(std::initializer_list<Filter*> filtersIn)
141 {
142     std::vector<Filter*> filtersToAdd;
143     for (auto& filterIn : filtersIn) {
144         bool matched = false;
145         for (const auto& filter : filters_) {
146             if (filterIn == filter) {
147                 matched = true;
148                 break;
149             }
150         }
151         if (!matched) {
152             filtersToAdd.push_back(filterIn);
153         }
154     }
155     if (filtersToAdd.empty()) {
156         MEDIA_LOG_I("filters already exists");
157         return ErrorCode::SUCCESS;
158     }
159     {
160         OSAL::ScopedLock lock(mutex_);
161         this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end());
162     }
163     InitFilters(filtersToAdd);
164     return ErrorCode::SUCCESS;
165 }
166 
RemoveFilter(Filter * filter)167 ErrorCode PipelineCore::RemoveFilter(Filter* filter)
168 {
169     auto it = std::find_if(filters_.begin(), filters_.end(),
170                            [&filter](const Filter* filterPtr) { return filterPtr == filter; });
171     if (it != filters_.end()) {
172         MEDIA_LOG_I("RemoveFilter " PUBLIC_LOG_S, (*it)->GetName().c_str());
173         filters_.erase(it);
174     }
175     return ErrorCode::SUCCESS;
176 }
177 
RemoveFilterChain(Filter * firstFilter)178 ErrorCode PipelineCore::RemoveFilterChain(Filter* firstFilter)
179 {
180     if (!firstFilter) {
181         return ErrorCode::ERROR_INVALID_PARAMETER_VALUE;
182     }
183     std::queue<Filter*> levelFilters;
184     levelFilters.push(firstFilter);
185     while (!levelFilters.empty()) {
186         auto filter = levelFilters.front();
187         levelFilters.pop();
188         filter->UnlinkPrevFilters();
189         filtersToRemove_.push_back(filter);
190         for (auto&& nextFilter : filter->GetNextFilters()) {
191             levelFilters.push(nextFilter);
192         }
193     }
194     return ErrorCode::SUCCESS;
195 }
196 
LinkFilters(std::initializer_list<Filter * > filters)197 ErrorCode PipelineCore::LinkFilters(std::initializer_list<Filter*> filters)
198 {
199     std::vector<Filter*> filtersToLink;
200     std::vector<Filter*>(filters).swap(filtersToLink);
201     int count = std::max((int)(filtersToLink.size()) - 1, 0);
202     for (int i = 0; i < count; i++) {
203         filtersToLink[i]->GetOutPort(PORT_NAME_DEFAULT)->Connect(filtersToLink[i + 1]->GetInPort(PORT_NAME_DEFAULT));
204         filtersToLink[i + 1]->GetInPort(PORT_NAME_DEFAULT)->Connect(filtersToLink[i]->GetOutPort(PORT_NAME_DEFAULT));
205     }
206     return ErrorCode::SUCCESS;
207 }
208 
LinkPorts(std::shared_ptr<OutPort> port1,std::shared_ptr<InPort> port2)209 ErrorCode PipelineCore::LinkPorts(std::shared_ptr<OutPort> port1, std::shared_ptr<InPort> port2)
210 {
211     FAIL_RETURN(port1->Connect(port2));
212     FAIL_RETURN(port2->Connect(port1));
213     return ErrorCode::SUCCESS;
214 }
NotifyEvent(const Event & event)215 void PipelineCore::NotifyEvent(const Event& event)
216 {
217     if (eventReceiver_) {
218         eventReceiver_->OnEvent(event);
219     } else {
220         MEDIA_LOG_I("no event receiver when receive type " PUBLIC_LOG_D32, event.type);
221     }
222 }
OnEvent(const Event & event)223 void PipelineCore::OnEvent(const Event& event)
224 {
225     if (event.type != EventType::EVENT_READY) {
226         NotifyEvent(event);
227         return;
228     }
229 
230     readyEventCnt_++;
231     MEDIA_LOG_I("OnEvent readyCnt: " PUBLIC_LOG_ZU " / " PUBLIC_LOG_ZU, readyEventCnt_, filters_.size());
232     if (readyEventCnt_ == filters_.size()) {
233         state_ = FilterState::READY;
234         NotifyEvent(event);
235         readyEventCnt_ = 0;
236     }
237 }
238 
InitFilters(const std::vector<Filter * > & filters)239 void PipelineCore::InitFilters(const std::vector<Filter*>& filters)
240 {
241     for (auto& filter : filters) {
242         filter->Init(this, filterCallback_);
243     }
244 }
245 
246 namespace {
247 struct FilterNode {
248     size_t inDegree {0};
249     Filter* filter {nullptr};
250     std::vector<size_t> nexts {}; // store filter index
FilterNodeOHOS::Media::Pipeline::__anoneea12a5c0211::FilterNode251     FilterNode(size_t degree, Filter* f, std::vector<size_t> next) : inDegree(degree), filter(f), nexts(std::move(next))
252     {
253     }
254 };
255 
ConstructGraph(const std::vector<Filter * > & filters,std::vector<FilterNode> & graph,std::stack<size_t> & stack)256 void ConstructGraph(const std::vector<Filter*> &filters, std::vector<FilterNode> &graph, std::stack<size_t> &stack)
257 {
258     std::map<Filter*, size_t> mapInfo; // filter to index map, index is the node index in graph
259     size_t index = 0;
260     for (const auto& f : filters) {
261         graph.emplace_back(FilterNode(f->GetPreFilters().size(), f, {}));
262         if (f->GetPreFilters().empty()) {
263             stack.push(index);
264         }
265         mapInfo[f] = index++;
266     }
267     for (const auto& f : filters) {
268         auto& tmp = graph[mapInfo[f]].nexts;
269         for (const auto& next : f->GetNextFilters()) {
270             tmp.emplace_back(mapInfo[next]);
271         }
272     }
273 }
274 }
275 
ReorderFilters()276 void PipelineCore::ReorderFilters()
277 {
278     std::vector<FilterNode> graph;
279     std::stack<size_t> stack;
280     ConstructGraph(filters_, graph, stack);
281     std::vector<Filter*> result;
282     while (!stack.empty()) {
283         auto cur = stack.top();
284         stack.pop();
285         for (const auto& idx : graph[cur].nexts) {
286             graph[idx].inDegree--;
287             if (graph[idx].inDegree == 0) {
288                 stack.push(idx);
289             }
290         }
291         result.emplace_back(graph[cur].filter);
292     }
293     if (result.size() == filters_.size()) {
294         filters_.clear();
295         filters_.assign(result.begin(), result.end());
296     }
297 }
298 } // namespace Pipeline
299 } // namespace Media
300 } // namespace OHOS
301