1 /*
2 * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "PipelineCore"
17
18 #include "pipeline/core/pipeline_core.h"
19 #include <queue>
20 #include <stack>
21 #include "foundation/log.h"
22 #include "foundation/osal/thread/scoped_lock.h"
23 #include "foundation/utils/hitrace_utils.h"
24 #include "foundation/utils/steady_clock.h"
25
26 namespace OHOS {
27 namespace Media {
28 namespace Pipeline {
Init(EventReceiver * receiver,FilterCallback * callback)29 void PipelineCore::Init(EventReceiver* receiver, FilterCallback* callback)
30 {
31 eventReceiver_ = receiver;
32 filterCallback_ = callback;
33 state_ = FilterState::INITIALIZED;
34 }
35
Prepare()36 ErrorCode PipelineCore::Prepare()
37 {
38 state_ = FilterState::PREPARING;
39 ErrorCode rtv = ErrorCode::SUCCESS;
40 OSAL::ScopedLock lock(mutex_);
41 ReorderFilters();
42 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
43 auto& filterPtr = *it;
44 if (filterPtr) {
45 if ((rtv = filterPtr->Prepare()) != ErrorCode::SUCCESS) {
46 break;
47 }
48 } else {
49 MEDIA_LOG_E("invalid pointer in filters.");
50 }
51 }
52 return rtv;
53 }
54
Start()55 ErrorCode PipelineCore::Start()
56 {
57 state_ = FilterState::RUNNING;
58 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
59 auto rtv = (*it)->Start();
60 FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
61 }
62 return ErrorCode::SUCCESS;
63 }
64
Pause()65 ErrorCode PipelineCore::Pause()
66 {
67 if (state_ == FilterState::PAUSED) {
68 return ErrorCode::SUCCESS;
69 }
70 if (state_ != FilterState::READY && state_ != FilterState::RUNNING) {
71 return ErrorCode::ERROR_INVALID_OPERATION;
72 }
73 state_ = FilterState::PAUSED;
74 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
75 if ((*it)->Pause() != ErrorCode::SUCCESS) {
76 MEDIA_LOG_I("pause filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
77 }
78 }
79 return ErrorCode::SUCCESS;
80 }
81
Resume()82 ErrorCode PipelineCore::Resume()
83 {
84 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
85 MEDIA_LOG_I("Resume filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
86 auto rtv = (*it)->Resume();
87 FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
88 }
89 state_ = FilterState::RUNNING;
90 return ErrorCode::SUCCESS;
91 }
92
Stop()93 ErrorCode PipelineCore::Stop()
94 {
95 readyEventCnt_ = 0;
96 state_ = FilterState::INITIALIZED;
97 filtersToRemove_.clear();
98 filtersToRemove_.reserve(filters_.size());
99 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
100 if (*it == nullptr) {
101 MEDIA_LOG_E("PipelineCore error: " PUBLIC_LOG_ZU, filters_.size());
102 continue;
103 }
104 MEDIA_LOG_I("Stop filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
105 PROFILE_BEGIN();
106 auto rtv = (*it)->Stop();
107 PROFILE_END("Stop finished for %s", (*it)->GetName().c_str());
108 FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
109 }
110 for (const auto& filter : filtersToRemove_) {
111 RemoveFilter(filter);
112 }
113 MEDIA_LOG_I("Stop finished, filter number: " PUBLIC_LOG_ZU, filters_.size());
114 return ErrorCode::SUCCESS;
115 }
116
FlushStart()117 void PipelineCore::FlushStart()
118 {
119 SYNC_TRACER();
120 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
121 MEDIA_LOG_I("FlushStart for filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
122 (*it)->FlushStart();
123 }
124 }
125
FlushEnd()126 void PipelineCore::FlushEnd()
127 {
128 SYNC_TRACER();
129 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
130 MEDIA_LOG_I("FlushEnd for filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
131 (*it)->FlushEnd();
132 }
133 }
134
GetState()135 FilterState PipelineCore::GetState()
136 {
137 return state_;
138 }
139
AddFilters(std::initializer_list<Filter * > filtersIn)140 ErrorCode PipelineCore::AddFilters(std::initializer_list<Filter*> filtersIn)
141 {
142 std::vector<Filter*> filtersToAdd;
143 for (auto& filterIn : filtersIn) {
144 bool matched = false;
145 for (const auto& filter : filters_) {
146 if (filterIn == filter) {
147 matched = true;
148 break;
149 }
150 }
151 if (!matched) {
152 filtersToAdd.push_back(filterIn);
153 }
154 }
155 if (filtersToAdd.empty()) {
156 MEDIA_LOG_I("filters already exists");
157 return ErrorCode::SUCCESS;
158 }
159 {
160 OSAL::ScopedLock lock(mutex_);
161 this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end());
162 }
163 InitFilters(filtersToAdd);
164 return ErrorCode::SUCCESS;
165 }
166
RemoveFilter(Filter * filter)167 ErrorCode PipelineCore::RemoveFilter(Filter* filter)
168 {
169 auto it = std::find_if(filters_.begin(), filters_.end(),
170 [&filter](const Filter* filterPtr) { return filterPtr == filter; });
171 if (it != filters_.end()) {
172 MEDIA_LOG_I("RemoveFilter " PUBLIC_LOG_S, (*it)->GetName().c_str());
173 filters_.erase(it);
174 }
175 return ErrorCode::SUCCESS;
176 }
177
RemoveFilterChain(Filter * firstFilter)178 ErrorCode PipelineCore::RemoveFilterChain(Filter* firstFilter)
179 {
180 if (!firstFilter) {
181 return ErrorCode::ERROR_INVALID_PARAMETER_VALUE;
182 }
183 std::queue<Filter*> levelFilters;
184 levelFilters.push(firstFilter);
185 while (!levelFilters.empty()) {
186 auto filter = levelFilters.front();
187 levelFilters.pop();
188 filter->UnlinkPrevFilters();
189 filtersToRemove_.push_back(filter);
190 for (auto&& nextFilter : filter->GetNextFilters()) {
191 levelFilters.push(nextFilter);
192 }
193 }
194 return ErrorCode::SUCCESS;
195 }
196
LinkFilters(std::initializer_list<Filter * > filters)197 ErrorCode PipelineCore::LinkFilters(std::initializer_list<Filter*> filters)
198 {
199 std::vector<Filter*> filtersToLink;
200 std::vector<Filter*>(filters).swap(filtersToLink);
201 int count = std::max((int)(filtersToLink.size()) - 1, 0);
202 for (int i = 0; i < count; i++) {
203 filtersToLink[i]->GetOutPort(PORT_NAME_DEFAULT)->Connect(filtersToLink[i + 1]->GetInPort(PORT_NAME_DEFAULT));
204 filtersToLink[i + 1]->GetInPort(PORT_NAME_DEFAULT)->Connect(filtersToLink[i]->GetOutPort(PORT_NAME_DEFAULT));
205 }
206 return ErrorCode::SUCCESS;
207 }
208
LinkPorts(std::shared_ptr<OutPort> port1,std::shared_ptr<InPort> port2)209 ErrorCode PipelineCore::LinkPorts(std::shared_ptr<OutPort> port1, std::shared_ptr<InPort> port2)
210 {
211 FAIL_RETURN(port1->Connect(port2));
212 FAIL_RETURN(port2->Connect(port1));
213 return ErrorCode::SUCCESS;
214 }
NotifyEvent(const Event & event)215 void PipelineCore::NotifyEvent(const Event& event)
216 {
217 if (eventReceiver_) {
218 eventReceiver_->OnEvent(event);
219 } else {
220 MEDIA_LOG_I("no event receiver when receive type " PUBLIC_LOG_D32, event.type);
221 }
222 }
OnEvent(const Event & event)223 void PipelineCore::OnEvent(const Event& event)
224 {
225 if (event.type != EventType::EVENT_READY) {
226 NotifyEvent(event);
227 return;
228 }
229
230 readyEventCnt_++;
231 MEDIA_LOG_I("OnEvent readyCnt: " PUBLIC_LOG_ZU " / " PUBLIC_LOG_ZU, readyEventCnt_, filters_.size());
232 if (readyEventCnt_ == filters_.size()) {
233 state_ = FilterState::READY;
234 NotifyEvent(event);
235 readyEventCnt_ = 0;
236 }
237 }
238
InitFilters(const std::vector<Filter * > & filters)239 void PipelineCore::InitFilters(const std::vector<Filter*>& filters)
240 {
241 for (auto& filter : filters) {
242 filter->Init(this, filterCallback_);
243 }
244 }
245
246 namespace {
247 struct FilterNode {
248 size_t inDegree {0};
249 Filter* filter {nullptr};
250 std::vector<size_t> nexts {}; // store filter index
FilterNodeOHOS::Media::Pipeline::__anoneea12a5c0211::FilterNode251 FilterNode(size_t degree, Filter* f, std::vector<size_t> next) : inDegree(degree), filter(f), nexts(std::move(next))
252 {
253 }
254 };
255
ConstructGraph(const std::vector<Filter * > & filters,std::vector<FilterNode> & graph,std::stack<size_t> & stack)256 void ConstructGraph(const std::vector<Filter*> &filters, std::vector<FilterNode> &graph, std::stack<size_t> &stack)
257 {
258 std::map<Filter*, size_t> mapInfo; // filter to index map, index is the node index in graph
259 size_t index = 0;
260 for (const auto& f : filters) {
261 graph.emplace_back(FilterNode(f->GetPreFilters().size(), f, {}));
262 if (f->GetPreFilters().empty()) {
263 stack.push(index);
264 }
265 mapInfo[f] = index++;
266 }
267 for (const auto& f : filters) {
268 auto& tmp = graph[mapInfo[f]].nexts;
269 for (const auto& next : f->GetNextFilters()) {
270 tmp.emplace_back(mapInfo[next]);
271 }
272 }
273 }
274 }
275
ReorderFilters()276 void PipelineCore::ReorderFilters()
277 {
278 std::vector<FilterNode> graph;
279 std::stack<size_t> stack;
280 ConstructGraph(filters_, graph, stack);
281 std::vector<Filter*> result;
282 while (!stack.empty()) {
283 auto cur = stack.top();
284 stack.pop();
285 for (const auto& idx : graph[cur].nexts) {
286 graph[idx].inDegree--;
287 if (graph[idx].inDegree == 0) {
288 stack.push(idx);
289 }
290 }
291 result.emplace_back(graph[cur].filter);
292 }
293 if (result.size() == filters_.size()) {
294 filters_.clear();
295 filters_.assign(result.begin(), result.end());
296 }
297 }
298 } // namespace Pipeline
299 } // namespace Media
300 } // namespace OHOS
301