1 /*
2 * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "PipelineCore"
17
18 #include "pipeline_core.h"
19 #include <queue>
20 #include <stack>
21 #include "foundation/log.h"
22 #include "osal/thread/scoped_lock.h"
23 #include "utils/steady_clock.h"
24
25 namespace OHOS {
26 namespace Media {
27 namespace Pipeline {
GeTrackMeta(int32_t trackId)28 std::shared_ptr<const Plugin::Meta> OHOS::Media::Pipeline::MetaBundle::GeTrackMeta(int32_t trackId)
29 {
30 for (auto& ptr : trackMeta_) {
31 uint32_t found = 0;
32 if (ptr->GetUint32(Plugin::MetaID::TRACK_ID, found) && found == trackId) {
33 return ptr;
34 }
35 }
36 return nullptr;
37 }
38
UpdateGlobalMeta(const Plugin::Meta & meta)39 void MetaBundle::UpdateGlobalMeta(const Plugin::Meta& meta)
40 {
41 if (globalMeta_ == nullptr) {
42 globalMeta_ = std::make_shared<Plugin::Meta>();
43 }
44 globalMeta_->Update(meta);
45 }
46
UpdateTrackMeta(const Plugin::Meta & meta)47 void MetaBundle::UpdateTrackMeta(const Plugin::Meta& meta)
48 {
49 uint32_t trackId = 0;
50 if (!meta.GetUint32(Plugin::MetaID::TRACK_ID, trackId)) {
51 MEDIA_LOG_W("update stream meta with invalid meta, which contains no track id, will ignore this meta");
52 return;
53 }
54 for (const auto& tmp : trackMeta_) {
55 uint32_t tid = 0;
56 if (tmp->GetUint32(Plugin::MetaID::TRACK_ID, tid) && trackId == tid) {
57 tmp->Update(meta);
58 return;
59 }
60 }
61 auto ptr = std::make_shared<Plugin::Meta>();
62 ptr->Update(meta);
63 trackMeta_.emplace_back(ptr);
64 }
65
PipelineCore(const std::string & name)66 PipelineCore::PipelineCore(const std::string& name)
67 : name_(name), eventReceiver_(nullptr), filterCallback_(nullptr), metaBundle_(std::make_shared<MetaBundle>())
68 {
69 }
70
GetName()71 const std::string& PipelineCore::GetName()
72 {
73 return name_;
74 }
75
GetOwnerPipeline() const76 const EventReceiver* PipelineCore::GetOwnerPipeline() const
77 {
78 return eventReceiver_;
79 }
80
Init(EventReceiver * receiver,FilterCallback * callback)81 void PipelineCore::Init(EventReceiver* receiver, FilterCallback* callback)
82 {
83 eventReceiver_ = receiver;
84 filterCallback_ = callback;
85 state_ = FilterState::INITIALIZED;
86 }
87
Prepare()88 ErrorCode PipelineCore::Prepare()
89 {
90 state_ = FilterState::PREPARING;
91 ErrorCode rtv = ErrorCode::SUCCESS;
92 OSAL::ScopedLock lock(mutex_);
93 ReorderFilters();
94 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
95 auto& filterPtr = *it;
96 if (filterPtr) {
97 if ((rtv = filterPtr->Prepare()) != ErrorCode::SUCCESS) {
98 break;
99 }
100 } else {
101 MEDIA_LOG_E("invalid pointer in filters.");
102 }
103 }
104 return rtv;
105 }
106
Start()107 ErrorCode PipelineCore::Start()
108 {
109 state_ = FilterState::RUNNING;
110 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
111 auto rtv = (*it)->Start();
112 FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
113 }
114 return ErrorCode::SUCCESS;
115 }
116
Pause()117 ErrorCode PipelineCore::Pause()
118 {
119 if (state_ == FilterState::PAUSED) {
120 return ErrorCode::SUCCESS;
121 }
122 if (state_ != FilterState::READY && state_ != FilterState::RUNNING) {
123 return ErrorCode::ERROR_INVALID_OPERATION;
124 }
125 state_ = FilterState::PAUSED;
126 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
127 if ((*it)->Pause() != ErrorCode::SUCCESS) {
128 MEDIA_LOG_I("pause filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
129 }
130 }
131 return ErrorCode::SUCCESS;
132 }
133
Resume()134 ErrorCode PipelineCore::Resume()
135 {
136 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
137 MEDIA_LOG_I("Resume filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
138 auto rtv = (*it)->Resume();
139 FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
140 }
141 state_ = FilterState::RUNNING;
142 return ErrorCode::SUCCESS;
143 }
144
Stop()145 ErrorCode PipelineCore::Stop()
146 {
147 readyEventCnt_ = 0;
148 state_ = FilterState::INITIALIZED;
149 filtersToRemove_.clear();
150 filtersToRemove_.reserve(filters_.size());
151 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
152 if (*it == nullptr) {
153 MEDIA_LOG_E("PipelineCore error: " PUBLIC_LOG_ZU, filters_.size());
154 continue;
155 }
156 MEDIA_LOG_I("Stop filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
157 PROFILE_BEGIN();
158 auto rtv = (*it)->Stop();
159 PROFILE_END("Stop finished for %s", (*it)->GetName().c_str());
160 FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
161 }
162 for (const auto& filter : filtersToRemove_) {
163 RemoveFilter(filter);
164 }
165 MEDIA_LOG_I("Stop finished, filter number: " PUBLIC_LOG_ZU, filters_.size());
166 return ErrorCode::SUCCESS;
167 }
168
FlushStart()169 void PipelineCore::FlushStart()
170 {
171 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
172 MEDIA_LOG_I("FlushStart for filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
173 (*it)->FlushStart();
174 }
175 }
176
FlushEnd()177 void PipelineCore::FlushEnd()
178 {
179 for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
180 MEDIA_LOG_I("FlushEnd for filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
181 (*it)->FlushEnd();
182 }
183 }
184
GetState()185 FilterState PipelineCore::GetState()
186 {
187 return state_;
188 }
189
AddFilters(std::initializer_list<Filter * > filtersIn)190 ErrorCode PipelineCore::AddFilters(std::initializer_list<Filter*> filtersIn)
191 {
192 std::vector<Filter*> filtersToAdd;
193 for (auto& filterIn : filtersIn) {
194 bool matched = false;
195 for (auto& filter : filters_) {
196 if (filterIn == filter) {
197 matched = true;
198 break;
199 }
200 }
201 if (!matched) {
202 filtersToAdd.push_back(filterIn);
203 }
204 }
205 if (filtersToAdd.empty()) {
206 MEDIA_LOG_I("filters already exists");
207 return ErrorCode::SUCCESS;
208 }
209 {
210 OSAL::ScopedLock lock(mutex_);
211 this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end());
212 }
213 InitFilters(filtersToAdd);
214 return ErrorCode::SUCCESS;
215 }
216
RemoveFilter(Filter * filter)217 ErrorCode PipelineCore::RemoveFilter(Filter* filter)
218 {
219 auto it = std::find_if(filters_.begin(), filters_.end(),
220 [&filter](const Filter* filterPtr) { return filterPtr == filter; });
221 if (it != filters_.end()) {
222 MEDIA_LOG_I("RemoveFilter " PUBLIC_LOG_S, (*it)->GetName().c_str());
223 filters_.erase(it);
224 }
225 return ErrorCode::SUCCESS;
226 }
227
RemoveFilterChain(Filter * firstFilter)228 ErrorCode PipelineCore::RemoveFilterChain(Filter* firstFilter)
229 {
230 if (!firstFilter) {
231 return ErrorCode::ERROR_INVALID_PARAMETER_VALUE;
232 }
233 std::queue<Filter*> levelFilters;
234 levelFilters.push(firstFilter);
235 while (!levelFilters.empty()) {
236 auto filter = levelFilters.front();
237 levelFilters.pop();
238 filter->UnlinkPrevFilters();
239 filtersToRemove_.push_back(filter);
240 for (auto&& nextFilter : filter->GetNextFilters()) {
241 levelFilters.push(nextFilter);
242 }
243 }
244 return ErrorCode::SUCCESS;
245 }
246
LinkFilters(std::initializer_list<Filter * > filters)247 ErrorCode PipelineCore::LinkFilters(std::initializer_list<Filter*> filters)
248 {
249 std::vector<Filter*> filtersToLink;
250 std::vector<Filter*>(filters).swap(filtersToLink);
251 int count = std::max((int)(filtersToLink.size()) - 1, 0);
252 for (int i = 0; i < count; i++) {
253 filtersToLink[i]->GetOutPort(PORT_NAME_DEFAULT)->Connect(filtersToLink[i + 1]->GetInPort(PORT_NAME_DEFAULT));
254 filtersToLink[i + 1]->GetInPort(PORT_NAME_DEFAULT)->Connect(filtersToLink[i]->GetOutPort(PORT_NAME_DEFAULT));
255 }
256 return ErrorCode::SUCCESS;
257 }
258
LinkPorts(std::shared_ptr<OutPort> port1,std::shared_ptr<InPort> port2)259 ErrorCode PipelineCore::LinkPorts(std::shared_ptr<OutPort> port1, std::shared_ptr<InPort> port2)
260 {
261 FAIL_RETURN(port1->Connect(port2));
262 FAIL_RETURN(port2->Connect(port1));
263 return ErrorCode::SUCCESS;
264 }
NotifyEvent(const Event & event)265 void PipelineCore::NotifyEvent(const Event& event)
266 {
267 if (eventReceiver_) {
268 eventReceiver_->OnEvent(event);
269 } else {
270 MEDIA_LOG_I("no event receiver when receive type " PUBLIC_LOG_D32, event.type);
271 }
272 }
OnEvent(const Event & event)273 void PipelineCore::OnEvent(const Event& event)
274 {
275 if (event.type != EventType::EVENT_READY) {
276 NotifyEvent(event);
277 return;
278 }
279
280 readyEventCnt_++;
281 MEDIA_LOG_I("OnEvent readyCnt: " PUBLIC_LOG_ZU " / " PUBLIC_LOG_ZU, readyEventCnt_, filters_.size());
282 if (readyEventCnt_ == filters_.size()) {
283 state_ = FilterState::READY;
284 NotifyEvent(event);
285 readyEventCnt_ = 0;
286 }
287 }
288
InitFilters(const std::vector<Filter * > & filters)289 void PipelineCore::InitFilters(const std::vector<Filter*>& filters)
290 {
291 for (auto& filter : filters) {
292 filter->Init(this, filterCallback_);
293 }
294 }
295
296 namespace {
297 struct FilterNode {
298 size_t inDegree {0};
299 Filter* filter {nullptr};
300 std::vector<size_t> nexts {}; // store filter index
FilterNodeOHOS::Media::Pipeline::__anon4558845d0211::FilterNode301 FilterNode(size_t degree, Filter* f, std::vector<size_t> next) : inDegree(degree), filter(f), nexts(std::move(next))
302 {
303 }
304 };
305
ConstructGraph(const std::vector<Filter * > & filters,std::vector<FilterNode> & graph,std::stack<size_t> & stack)306 void ConstructGraph(const std::vector<Filter*> &filters, std::vector<FilterNode> &graph, std::stack<size_t> &stack)
307 {
308 std::map<Filter*, size_t> mapInfo; // filter to index map, index is the node index in graph
309 size_t index = 0;
310 for (const auto& f : filters) {
311 graph.emplace_back(FilterNode(f->GetPreFilters().size(), f, {}));
312 if (f->GetPreFilters().empty()) {
313 stack.push(index);
314 }
315 mapInfo[f] = index++;
316 }
317 for (const auto& f : filters) {
318 auto& tmp = graph[mapInfo[f]].nexts;
319 for (const auto& next : f->GetNextFilters()) {
320 tmp.emplace_back(mapInfo[next]);
321 }
322 }
323 }
324 }
325
ReorderFilters()326 void PipelineCore::ReorderFilters()
327 {
328 std::vector<FilterNode> graph;
329 std::stack<size_t> stack;
330 ConstructGraph(filters_, graph, stack);
331 std::vector<Filter*> result;
332 while (!stack.empty()) {
333 auto cur = stack.top();
334 stack.pop();
335 for (const auto& idx : graph[cur].nexts) {
336 graph[idx].inDegree--;
337 if (graph[idx].inDegree == 0) {
338 stack.push(idx);
339 }
340 }
341 result.emplace_back(graph[cur].filter);
342 }
343 if (result.size() == filters_.size()) {
344 filters_.clear();
345 filters_.assign(result.begin(), result.end());
346 }
347 }
348
SetSyncCenter(std::weak_ptr<IMediaSyncCenter> syncCenter)349 void PipelineCore::SetSyncCenter(std::weak_ptr<IMediaSyncCenter> syncCenter)
350 {
351 syncCenter_ = syncCenter;
352 }
353 } // namespace Pipeline
354 } // namespace Media
355 } // namespace OHOS
356