• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "PipelineCore"
17 
18 #include "pipeline_core.h"
19 #include <queue>
20 #include <stack>
21 #include "foundation/log.h"
22 #include "osal/thread/scoped_lock.h"
23 #include "utils/steady_clock.h"
24 
25 namespace OHOS {
26 namespace Media {
27 namespace Pipeline {
GeTrackMeta(int32_t trackId)28 std::shared_ptr<const Plugin::Meta> OHOS::Media::Pipeline::MetaBundle::GeTrackMeta(int32_t trackId)
29 {
30     for (auto& ptr : trackMeta_) {
31         uint32_t found = 0;
32         if (ptr->GetUint32(Plugin::MetaID::TRACK_ID, found) && found == trackId) {
33             return ptr;
34         }
35     }
36     return nullptr;
37 }
38 
UpdateGlobalMeta(const Plugin::Meta & meta)39 void MetaBundle::UpdateGlobalMeta(const Plugin::Meta& meta)
40 {
41     if (globalMeta_ == nullptr) {
42         globalMeta_ = std::make_shared<Plugin::Meta>();
43     }
44     globalMeta_->Update(meta);
45 }
46 
UpdateTrackMeta(const Plugin::Meta & meta)47 void MetaBundle::UpdateTrackMeta(const Plugin::Meta& meta)
48 {
49     uint32_t trackId = 0;
50     if (!meta.GetUint32(Plugin::MetaID::TRACK_ID, trackId)) {
51         MEDIA_LOG_W("update stream meta with invalid meta, which contains no track id, will ignore this meta");
52         return;
53     }
54     for (const auto& tmp : trackMeta_) {
55         uint32_t tid = 0;
56         if (tmp->GetUint32(Plugin::MetaID::TRACK_ID, tid) && trackId == tid) {
57             tmp->Update(meta);
58             return;
59         }
60     }
61     auto ptr = std::make_shared<Plugin::Meta>();
62     ptr->Update(meta);
63     trackMeta_.emplace_back(ptr);
64 }
65 
PipelineCore(const std::string & name)66 PipelineCore::PipelineCore(const std::string& name)
67     : name_(name), eventReceiver_(nullptr), filterCallback_(nullptr), metaBundle_(std::make_shared<MetaBundle>())
68 {
69 }
70 
GetName()71 const std::string& PipelineCore::GetName()
72 {
73     return name_;
74 }
75 
GetOwnerPipeline() const76 const EventReceiver* PipelineCore::GetOwnerPipeline() const
77 {
78     return eventReceiver_;
79 }
80 
Init(EventReceiver * receiver,FilterCallback * callback)81 void PipelineCore::Init(EventReceiver* receiver, FilterCallback* callback)
82 {
83     eventReceiver_ = receiver;
84     filterCallback_ = callback;
85     state_ = FilterState::INITIALIZED;
86 }
87 
Prepare()88 ErrorCode PipelineCore::Prepare()
89 {
90     state_ = FilterState::PREPARING;
91     ErrorCode rtv = ErrorCode::SUCCESS;
92     OSAL::ScopedLock lock(mutex_);
93     ReorderFilters();
94     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
95         auto& filterPtr = *it;
96         if (filterPtr) {
97             if ((rtv = filterPtr->Prepare()) != ErrorCode::SUCCESS) {
98                 break;
99             }
100         } else {
101             MEDIA_LOG_E("invalid pointer in filters.");
102         }
103     }
104     return rtv;
105 }
106 
Start()107 ErrorCode PipelineCore::Start()
108 {
109     state_ = FilterState::RUNNING;
110     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
111         auto rtv = (*it)->Start();
112         FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
113     }
114     return ErrorCode::SUCCESS;
115 }
116 
Pause()117 ErrorCode PipelineCore::Pause()
118 {
119     if (state_ == FilterState::PAUSED) {
120         return ErrorCode::SUCCESS;
121     }
122     if (state_ != FilterState::READY && state_ != FilterState::RUNNING) {
123         return ErrorCode::ERROR_INVALID_OPERATION;
124     }
125     state_ = FilterState::PAUSED;
126     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
127         if ((*it)->Pause() != ErrorCode::SUCCESS) {
128             MEDIA_LOG_I("pause filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
129         }
130     }
131     return ErrorCode::SUCCESS;
132 }
133 
Resume()134 ErrorCode PipelineCore::Resume()
135 {
136     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
137         MEDIA_LOG_I("Resume filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
138         auto rtv = (*it)->Resume();
139         FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
140     }
141     state_ = FilterState::RUNNING;
142     return ErrorCode::SUCCESS;
143 }
144 
Stop()145 ErrorCode PipelineCore::Stop()
146 {
147     readyEventCnt_ = 0;
148     state_ = FilterState::INITIALIZED;
149     filtersToRemove_.clear();
150     filtersToRemove_.reserve(filters_.size());
151     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
152         if (*it == nullptr) {
153             MEDIA_LOG_E("PipelineCore error: " PUBLIC_LOG_ZU, filters_.size());
154             continue;
155         }
156         MEDIA_LOG_I("Stop filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
157         PROFILE_BEGIN();
158         auto rtv = (*it)->Stop();
159         PROFILE_END("Stop finished for %s", (*it)->GetName().c_str());
160         FALSE_RETURN_V(rtv == ErrorCode::SUCCESS, rtv);
161     }
162     for (const auto& filter : filtersToRemove_) {
163         RemoveFilter(filter);
164     }
165     MEDIA_LOG_I("Stop finished, filter number: " PUBLIC_LOG_ZU, filters_.size());
166     return ErrorCode::SUCCESS;
167 }
168 
FlushStart()169 void PipelineCore::FlushStart()
170 {
171     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
172         MEDIA_LOG_I("FlushStart for filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
173         (*it)->FlushStart();
174     }
175 }
176 
FlushEnd()177 void PipelineCore::FlushEnd()
178 {
179     for (auto it = filters_.rbegin(); it != filters_.rend(); ++it) {
180         MEDIA_LOG_I("FlushEnd for filter: " PUBLIC_LOG_S, (*it)->GetName().c_str());
181         (*it)->FlushEnd();
182     }
183 }
184 
GetState()185 FilterState PipelineCore::GetState()
186 {
187     return state_;
188 }
189 
AddFilters(std::initializer_list<Filter * > filtersIn)190 ErrorCode PipelineCore::AddFilters(std::initializer_list<Filter*> filtersIn)
191 {
192     std::vector<Filter*> filtersToAdd;
193     for (auto& filterIn : filtersIn) {
194         bool matched = false;
195         for (auto& filter : filters_) {
196             if (filterIn == filter) {
197                 matched = true;
198                 break;
199             }
200         }
201         if (!matched) {
202             filtersToAdd.push_back(filterIn);
203         }
204     }
205     if (filtersToAdd.empty()) {
206         MEDIA_LOG_I("filters already exists");
207         return ErrorCode::SUCCESS;
208     }
209     {
210         OSAL::ScopedLock lock(mutex_);
211         this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end());
212     }
213     InitFilters(filtersToAdd);
214     return ErrorCode::SUCCESS;
215 }
216 
RemoveFilter(Filter * filter)217 ErrorCode PipelineCore::RemoveFilter(Filter* filter)
218 {
219     auto it = std::find_if(filters_.begin(), filters_.end(),
220                            [&filter](const Filter* filterPtr) { return filterPtr == filter; });
221     if (it != filters_.end()) {
222         MEDIA_LOG_I("RemoveFilter " PUBLIC_LOG_S, (*it)->GetName().c_str());
223         filters_.erase(it);
224     }
225     return ErrorCode::SUCCESS;
226 }
227 
RemoveFilterChain(Filter * firstFilter)228 ErrorCode PipelineCore::RemoveFilterChain(Filter* firstFilter)
229 {
230     if (!firstFilter) {
231         return ErrorCode::ERROR_INVALID_PARAMETER_VALUE;
232     }
233     std::queue<Filter*> levelFilters;
234     levelFilters.push(firstFilter);
235     while (!levelFilters.empty()) {
236         auto filter = levelFilters.front();
237         levelFilters.pop();
238         filter->UnlinkPrevFilters();
239         filtersToRemove_.push_back(filter);
240         for (auto&& nextFilter : filter->GetNextFilters()) {
241             levelFilters.push(nextFilter);
242         }
243     }
244     return ErrorCode::SUCCESS;
245 }
246 
LinkFilters(std::initializer_list<Filter * > filters)247 ErrorCode PipelineCore::LinkFilters(std::initializer_list<Filter*> filters)
248 {
249     std::vector<Filter*> filtersToLink;
250     std::vector<Filter*>(filters).swap(filtersToLink);
251     int count = std::max((int)(filtersToLink.size()) - 1, 0);
252     for (int i = 0; i < count; i++) {
253         filtersToLink[i]->GetOutPort(PORT_NAME_DEFAULT)->Connect(filtersToLink[i + 1]->GetInPort(PORT_NAME_DEFAULT));
254         filtersToLink[i + 1]->GetInPort(PORT_NAME_DEFAULT)->Connect(filtersToLink[i]->GetOutPort(PORT_NAME_DEFAULT));
255     }
256     return ErrorCode::SUCCESS;
257 }
258 
LinkPorts(std::shared_ptr<OutPort> port1,std::shared_ptr<InPort> port2)259 ErrorCode PipelineCore::LinkPorts(std::shared_ptr<OutPort> port1, std::shared_ptr<InPort> port2)
260 {
261     FAIL_RETURN(port1->Connect(port2));
262     FAIL_RETURN(port2->Connect(port1));
263     return ErrorCode::SUCCESS;
264 }
NotifyEvent(const Event & event)265 void PipelineCore::NotifyEvent(const Event& event)
266 {
267     if (eventReceiver_) {
268         eventReceiver_->OnEvent(event);
269     } else {
270         MEDIA_LOG_I("no event receiver when receive type " PUBLIC_LOG_D32, event.type);
271     }
272 }
OnEvent(const Event & event)273 void PipelineCore::OnEvent(const Event& event)
274 {
275     if (event.type != EventType::EVENT_READY) {
276         NotifyEvent(event);
277         return;
278     }
279 
280     readyEventCnt_++;
281     MEDIA_LOG_I("OnEvent readyCnt: " PUBLIC_LOG_ZU " / " PUBLIC_LOG_ZU, readyEventCnt_, filters_.size());
282     if (readyEventCnt_ == filters_.size()) {
283         state_ = FilterState::READY;
284         NotifyEvent(event);
285         readyEventCnt_ = 0;
286     }
287 }
288 
InitFilters(const std::vector<Filter * > & filters)289 void PipelineCore::InitFilters(const std::vector<Filter*>& filters)
290 {
291     for (auto& filter : filters) {
292         filter->Init(this, filterCallback_);
293     }
294 }
295 
296 namespace {
297 struct FilterNode {
298     size_t inDegree {0};
299     Filter* filter {nullptr};
300     std::vector<size_t> nexts {}; // store filter index
FilterNodeOHOS::Media::Pipeline::__anon4558845d0211::FilterNode301     FilterNode(size_t degree, Filter* f, std::vector<size_t> next) : inDegree(degree), filter(f), nexts(std::move(next))
302     {
303     }
304 };
305 
ConstructGraph(const std::vector<Filter * > & filters,std::vector<FilterNode> & graph,std::stack<size_t> & stack)306 void ConstructGraph(const std::vector<Filter*> &filters, std::vector<FilterNode> &graph, std::stack<size_t> &stack)
307 {
308     std::map<Filter*, size_t> mapInfo; // filter to index map, index is the node index in graph
309     size_t index = 0;
310     for (const auto& f : filters) {
311         graph.emplace_back(FilterNode(f->GetPreFilters().size(), f, {}));
312         if (f->GetPreFilters().empty()) {
313             stack.push(index);
314         }
315         mapInfo[f] = index++;
316     }
317     for (const auto& f : filters) {
318         auto& tmp = graph[mapInfo[f]].nexts;
319         for (const auto& next : f->GetNextFilters()) {
320             tmp.emplace_back(mapInfo[next]);
321         }
322     }
323 }
324 }
325 
ReorderFilters()326 void PipelineCore::ReorderFilters()
327 {
328     std::vector<FilterNode> graph;
329     std::stack<size_t> stack;
330     ConstructGraph(filters_, graph, stack);
331     std::vector<Filter*> result;
332     while (!stack.empty()) {
333         auto cur = stack.top();
334         stack.pop();
335         for (const auto& idx : graph[cur].nexts) {
336             graph[idx].inDegree--;
337             if (graph[idx].inDegree == 0) {
338                 stack.push(idx);
339             }
340         }
341         result.emplace_back(graph[cur].filter);
342     }
343     if (result.size() == filters_.size()) {
344         filters_.clear();
345         filters_.assign(result.begin(), result.end());
346     }
347 }
348 
SetSyncCenter(std::weak_ptr<IMediaSyncCenter> syncCenter)349 void PipelineCore::SetSyncCenter(std::weak_ptr<IMediaSyncCenter> syncCenter)
350 {
351     syncCenter_ = syncCenter;
352 }
353 } // namespace Pipeline
354 } // namespace Media
355 } // namespace OHOS
356