1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 * http://www.apache.org/licenses/LICENSE-2.0
7 * Unless required by applicable law or agreed to in writing, software
8 * distributed under the License is distributed on an "AS IS" BASIS,
9 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 * See the License for the specific language governing permissions and
11 * limitations under the License.
12 */
13
14 #include "stream_pipeline_dispatcher.h"
15
16 namespace OHOS::Camera {
17
Create()18 std::unique_ptr<StreamPipelineDispatcher> StreamPipelineDispatcher::Create()
19 {
20 return std::make_unique<StreamPipelineDispatcher>();
21 }
22
GenerateNodeSeq(std::vector<std::shared_ptr<INode>> & nodeVec,const std::shared_ptr<INode> & node)23 void StreamPipelineDispatcher::GenerateNodeSeq(std::vector<std::shared_ptr<INode>>& nodeVec,
24 const std::shared_ptr<INode>& node)
25 {
26 if (node != nullptr) {
27 nodeVec.push_back(node);
28 } else {
29 return;
30 }
31
32 if (node->GetNumberOfInPorts() == 0) {
33 return;
34 }
35
36 for (const auto& it : node->GetInPorts()) {
37 GenerateNodeSeq(nodeVec, it->Peer()->GetNode());
38 }
39 }
40
Update(const std::shared_ptr<Pipeline> & p)41 RetCode StreamPipelineDispatcher::Update(const std::shared_ptr<Pipeline>& p)
42 {
43 std::vector<std::shared_ptr<INode>> sink;
44 for (auto it = p->nodes_.rbegin(); it < p->nodes_.rend(); it++) {
45 if ((*it)->GetNumberOfInPorts() == 1 && (*it)->GetNumberOfOutPorts() == 0) {
46 sink.push_back(*it);
47 }
48 }
49
50 std::unordered_map<int, std::vector<std::shared_ptr<INode>>> seqNode;
51 for (const auto& it : sink) {
52 auto inPorts = it->GetInPorts();
53 if (!inPorts.empty()) {
54 // sink node has only one port, and it is a in-port
55 GenerateNodeSeq(seqNode[inPorts[0]->GetStreamId()], it);
56 }
57 }
58
59 for (auto& [id, pipe] : seqNode) {
60 CutUselessBranch(id, pipe);
61 }
62
63 std::swap(seqNode_, seqNode);
64 CAMERA_LOGI("------------------------Node Seq(UpStream) Dump Begin-------------\n");
65 for (auto [ss, vv] : seqNode_) {
66 CAMERA_LOGI("sink stream id:%{public}d \n", ss);
67 for (auto it : vv) {
68 CAMERA_LOGI("seq node name:%{public}s\n", it->GetName().c_str());
69 }
70 }
71 CAMERA_LOGI("------------------------Node Seq(UpStream) Dump End-------------\n");
72 #ifndef CAMERA_BUILT_ON_OHOS_LITE
73 SetDispatcherCallback();
74 #endif
75
76 return RC_OK;
77 }
78
SetDispatcherCallback()79 RetCode StreamPipelineDispatcher::SetDispatcherCallback()
80 {
81 CAMERA_LOGI("StreamPipelineDispatcher line: %{public}d", __LINE__);
82 RetCode ret = RC_OK;
83 for (auto iter = seqNode_.begin(); iter != seqNode_.end(); iter++) {
84 for (auto it = iter->second.rbegin(); it != iter->second.rend(); it++) {
85 CAMERA_LOGI("SetCallback node %{public}s begin", (*it)->GetName().c_str());
86 ret = (*it)->SetCallback(metaDataCb_) | ret;
87 CAMERA_LOGI("SetCallback node %{public}s end", (*it)->GetName().c_str());
88 }
89 }
90 return ret;
91 }
92
CutUselessBranch(int32_t streamId,std::vector<std::shared_ptr<INode>> & branch)93 void StreamPipelineDispatcher::CutUselessBranch(int32_t streamId, std::vector<std::shared_ptr<INode>>& branch)
94 {
95 auto it = std::find_if(branch.begin(), branch.end(), [streamId](const std::shared_ptr<INode> node) {
96 auto ports = node->GetOutPorts();
97 bool isSameBranch = true;
98 for (auto port : ports) {
99 if (port->GetStreamId() == streamId) {
100 return false;
101 }
102 isSameBranch = false;
103 }
104 return !isSameBranch;
105 });
106 if (it == branch.end()) {
107 return;
108 }
109 branch.erase(it, branch.end());
110
111 return;
112 }
113
Prepare(const int32_t streamId)114 RetCode StreamPipelineDispatcher::Prepare(const int32_t streamId)
115 {
116 if (seqNode_.count(streamId) == 0) {
117 return RC_ERROR;
118 }
119
120 RetCode ret = RC_OK;
121 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
122 CAMERA_LOGV("init node %{public}s begin", (*it)->GetName().c_str());
123 ret = (*it)->Init(streamId) | ret;
124 CAMERA_LOGV("init node %{public}s end", (*it)->GetName().c_str());
125 }
126 return ret;
127 }
128
Start(const int32_t streamId)129 RetCode StreamPipelineDispatcher::Start(const int32_t streamId)
130 {
131 if (seqNode_.count(streamId) == 0) {
132 return RC_ERROR;
133 }
134
135 RetCode ret = RC_OK;
136 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
137 CAMERA_LOGV("start node %{public}s begin", (*it)->GetName().c_str());
138 ret = (*it)->Start(streamId) | ret;
139 CAMERA_LOGV("start node %{public}s end", (*it)->GetName().c_str());
140 }
141 return ret;
142 }
143
Config(const int32_t streamId,const CaptureMeta & meta)144 RetCode StreamPipelineDispatcher::Config(const int32_t streamId, const CaptureMeta& meta)
145 {
146 if (seqNode_.count(streamId) == 0) {
147 return RC_ERROR;
148 }
149
150 RetCode ret = RC_OK;
151 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
152 ret = (*it)->Config(streamId, meta) | ret;
153 }
154 return ret;
155 }
156
UpdateSettingsConfig(const CaptureMeta & meta)157 RetCode StreamPipelineDispatcher::UpdateSettingsConfig(const CaptureMeta& meta)
158 {
159 RetCode ret = RC_OK;
160 for (auto iter = seqNode_.begin(); iter != seqNode_.end(); iter++) {
161 for (auto it = iter->second.rbegin(); it != iter->second.rend(); it++) {
162 CAMERA_LOGV("UpdateSettingsConfig node %{public}s begin", (*it)->GetName().c_str());
163 ret = (*it)->UpdateSettingsConfig(meta) | ret;
164 CAMERA_LOGV("UpdateSettingsConfig node %{public}s end", (*it)->GetName().c_str());
165 }
166 }
167 return ret;
168 }
169
Flush(const int32_t streamId)170 RetCode StreamPipelineDispatcher::Flush(const int32_t streamId)
171 {
172 if (seqNode_.count(streamId) == 0) {
173 return RC_ERROR;
174 }
175
176 RetCode ret = RC_OK;
177 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
178 CAMERA_LOGV("flush node %{public}s begin", (*it)->GetName().c_str());
179 ret = (*it)->Flush(streamId) | ret;
180 CAMERA_LOGV("flush node %{public}s end", (*it)->GetName().c_str());
181 }
182 return ret;
183 }
184
SetCallback(const MetaDataCb cb)185 void StreamPipelineDispatcher::SetCallback(const MetaDataCb cb)
186 {
187 CAMERA_LOGI("StreamPipelineDispatcher line: %{public}d", __LINE__);
188 metaDataCb_ = cb;
189 }
190
Stop(const int32_t streamId)191 RetCode StreamPipelineDispatcher::Stop(const int32_t streamId)
192 {
193 if (seqNode_.count(streamId) == 0) {
194 return RC_OK;
195 }
196
197 RetCode ret = RC_OK;
198 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
199 CAMERA_LOGV("stop node %{public}s begin", (*it)->GetName().c_str());
200 ret = (*it)->Stop(streamId) | ret;
201 CAMERA_LOGV("stop node %{public}s end", (*it)->GetName().c_str());
202 }
203 return ret;
204 }
205
Capture(const int32_t streamId,const int32_t captureId)206 RetCode StreamPipelineDispatcher::Capture(const int32_t streamId, const int32_t captureId)
207 {
208 if (seqNode_.count(streamId) == 0) {
209 return RC_ERROR;
210 }
211
212 RetCode ret = RC_OK;
213 for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
214 ret = (*it)->Capture(streamId, captureId) | ret;
215 }
216
217 return ret;
218 }
219
CancelCapture(const int32_t streamId)220 RetCode StreamPipelineDispatcher::CancelCapture(const int32_t streamId)
221 {
222 if (seqNode_.count(streamId) == 0) {
223 return RC_ERROR;
224 }
225
226 RetCode ret = RC_OK;
227 for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
228 ret = (*it)->CancelCapture(streamId) | ret;
229 }
230
231 return ret;
232 }
233
Destroy(const int32_t streamId)234 RetCode StreamPipelineDispatcher::Destroy(const int32_t streamId)
235 {
236 auto it = seqNode_.find(streamId);
237 if (it == seqNode_.end()) {
238 CAMERA_LOGV("pipeline for stream [id:%{public}d] doesn't exists, no need to destroy.", streamId);
239 return RC_OK;
240 }
241 seqNode_.erase(streamId);
242
243 return RC_OK;
244 }
245
GetNode(const int32_t streamId,const std::string name)246 std::shared_ptr<INode> StreamPipelineDispatcher::GetNode(const int32_t streamId, const std::string name)
247 {
248 if (seqNode_.count(streamId) == 0) {
249 return nullptr;
250 }
251
252 std::shared_ptr<INode> node = nullptr;
253 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
254 if (name == (*it)->GetName().substr(0, 3)) { // 3:Copy length
255 node = *it;
256 }
257 }
258 return node;
259 }
260 }
261