1 /*
2 * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "Pipeline"
17
18 #include <queue>
19 #include <stack>
20 #include "pipeline/pipeline.h"
21 #include "osal/task/autolock.h"
22 #include "osal/task/jobutils.h"
23 #include "common/log.h"
24 #include "osal/utils/hitrace_utils.h"
25
26 namespace OHOS {
27 namespace Media {
28 namespace Pipeline {
~Pipeline()29 Pipeline::~Pipeline() {}
Init(const std::shared_ptr<EventReceiver> & receiver,const std::shared_ptr<FilterCallback> & callback)30 void Pipeline::Init(const std::shared_ptr<EventReceiver>& receiver, const std::shared_ptr<FilterCallback>& callback)
31 {
32 state_ = FilterState::INITIALIZED;
33 eventReceiver_ = receiver;
34 filterCallback_ = callback;
35 }
36
Prepare()37 Status Pipeline::Prepare()
38 {
39 state_ = FilterState::PREPARING;
40 SubmitJobOnce([&] {
41 AutoLock lock(mutex_);
42 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
43 auto rtv = (*it)->Prepare();
44 FALSE_RETURN_V(rtv == Status::OK, rtv);
45 }
46 return Status::OK;
47 });
48 return Status::OK;
49 }
50
Start()51 Status Pipeline::Start()
52 {
53 state_ = FilterState::RUNNING;
54 Status ret = Status::OK;
55 SubmitJobOnce([&] {
56 AutoLock lock(mutex_);
57 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
58 auto rtv = (*it)->Start();
59 if (rtv != Status::OK) {
60 ret = rtv;
61 return;
62 }
63 }
64 });
65 return ret;
66 }
67
Pause()68 Status Pipeline::Pause()
69 {
70 if (state_ == FilterState::PAUSED) {
71 return Status::OK;
72 }
73 if (state_ != FilterState::READY && state_ != FilterState::RUNNING) {
74 return Status::ERROR_UNKNOWN;
75 }
76 state_ = FilterState::PAUSED;
77 SubmitJobOnce([&] {
78 AutoLock lock(mutex_);
79 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
80 if ((*it)->Pause() != Status::OK) {
81 }
82 }
83 return Status::OK;
84 });
85 return Status::OK;
86 }
87
Resume()88 Status Pipeline::Resume()
89 {
90 SubmitJobOnce([&] {
91 AutoLock lock(mutex_);
92 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
93 auto rtv = (*it)->Resume();
94 FALSE_RETURN_V(rtv == Status::OK, rtv);
95 }
96 state_ = FilterState::RUNNING;
97 return Status::OK;
98 });
99 return Status::OK;
100 }
101
Stop()102 Status Pipeline::Stop()
103 {
104 state_ = FilterState::INITIALIZED;
105 SubmitJobOnce([&] {
106 AutoLock lock(mutex_);
107 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
108 if (*it == nullptr) {
109 MEDIA_LOG_E("Pipeline error: " PUBLIC_LOG_ZU, filters_.size());
110 continue;
111 }
112 auto rtv = (*it)->Stop();
113 FALSE_RETURN_V(rtv == Status::OK, rtv);
114 }
115 MEDIA_LOG_I("Stop finished, filter number: " PUBLIC_LOG_ZU, filters_.size());
116 return Status::OK;
117 });
118 return Status::OK;
119 }
120
Flush()121 Status Pipeline::Flush()
122 {
123 SubmitJobOnce([&] {
124 AutoLock lock(mutex_);
125 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
126 (*it)->Flush();
127 }
128 return Status::OK;
129 });
130 return Status::OK;
131 }
132
Release()133 Status Pipeline::Release()
134 {
135 state_ = FilterState::CREATED;
136 SubmitJobOnce([&] {
137 AutoLock lock(mutex_);
138 for (auto it = filters_.begin(); it != filters_.end(); ++it) {
139 (*it)->Release();
140 }
141 filters_.clear();
142 return Status::OK;
143 });
144 return Status::OK;
145 }
146
AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)147 Status Pipeline::AddHeadFilters(std::vector<std::shared_ptr<Filter>> filtersIn)
148 {
149 std::vector<std::shared_ptr<Filter>> filtersToAdd;
150 for (auto& filterIn : filtersIn) {
151 bool matched = false;
152 for (const auto& filter : filters_) {
153 if (filterIn == filter) {
154 matched = true;
155 break;
156 }
157 }
158 if (!matched) {
159 filtersToAdd.push_back(filterIn);
160 }
161 }
162 if (filtersToAdd.empty()) {
163 MEDIA_LOG_I("filter already exists");
164 return Status::OK;
165 }
166 SubmitJobOnce([&] {
167 AutoLock lock(mutex_);
168 this->filters_.insert(this->filters_.end(), filtersToAdd.begin(), filtersToAdd.end());
169 });
170 return Status::OK;
171 }
172
RemoveHeadFilter(const std::shared_ptr<Filter> & filter)173 Status Pipeline::RemoveHeadFilter(const std::shared_ptr<Filter>& filter)
174 {
175 SubmitJobOnce([&] {
176 AutoLock lock(mutex_);
177 auto it = std::find_if(filters_.begin(), filters_.end(),
178 [&filter](const std::shared_ptr<Filter>& filterPtr) { return filterPtr == filter; });
179 if (it != filters_.end()) {
180 filters_.erase(it);
181 }
182 filter->Release();
183 return Status::OK;
184 });
185 return Status::OK;
186 }
187
LinkFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)188 Status Pipeline::LinkFilters(const std::shared_ptr<Filter> &preFilter,
189 const std::vector<std::shared_ptr<Filter>> &nextFilters,
190 StreamType type)
191 {
192 for (auto nextFilter : nextFilters) {
193 auto ret = preFilter->LinkNext(nextFilter, type);
194 FALSE_RETURN_V(ret == Status::OK, ret);
195 }
196 return Status::OK;
197 }
198
UpdateFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)199 Status Pipeline::UpdateFilters(const std::shared_ptr<Filter> &preFilter,
200 const std::vector<std::shared_ptr<Filter>> &nextFilters,
201 StreamType type)
202 {
203 for (auto nextFilter : nextFilters) {
204 preFilter->UpdateNext(nextFilter, type);
205 }
206 return Status::OK;
207 }
208
UnLinkFilters(const std::shared_ptr<Filter> & preFilter,const std::vector<std::shared_ptr<Filter>> & nextFilters,StreamType type)209 Status Pipeline::UnLinkFilters(const std::shared_ptr<Filter> &preFilter,
210 const std::vector<std::shared_ptr<Filter>> &nextFilters,
211 StreamType type)
212 {
213 for (auto nextFilter : nextFilters) {
214 preFilter->UnLinkNext(nextFilter, type);
215 }
216 return Status::OK;
217 }
218
OnEvent(const Event & event)219 void Pipeline::OnEvent(const Event& event)
220 {
221 }
222
223 } // namespace Pipeline
224 } // namespace Media
225 } // namespace OHOS
226