1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 * http://www.apache.org/licenses/LICENSE-2.0
7 * Unless required by applicable law or agreed to in writing, software
8 * distributed under the License is distributed on an "AS IS" BASIS,
9 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 * See the License for the specific language governing permissions and
11 * limitations under the License.
12 */
13
14 #include "stream_pipeline_dispatcher.h"
15
16 namespace OHOS::Camera {
17
Create()18 std::unique_ptr<StreamPipelineDispatcher> StreamPipelineDispatcher::Create()
19 {
20 return std::make_unique<StreamPipelineDispatcher>();
21 }
22
GenerateNodeSeq(std::vector<std::shared_ptr<INode>> & nodeVec,const std::shared_ptr<INode> & node)23 void StreamPipelineDispatcher::GenerateNodeSeq(std::vector<std::shared_ptr<INode>>& nodeVec,
24 const std::shared_ptr<INode>& node)
25 {
26 if (node != nullptr) {
27 nodeVec.push_back(node);
28 } else {
29 return;
30 }
31
32 if (node->GetNumberOfInPorts() == 0) {
33 return;
34 }
35
36 for (const auto& it : node->GetInPorts()) {
37 GenerateNodeSeq(nodeVec, it->Peer()->GetNode());
38 }
39 }
40
Update(const std::shared_ptr<Pipeline> & p)41 RetCode StreamPipelineDispatcher::Update(const std::shared_ptr<Pipeline>& p)
42 {
43 std::vector<std::shared_ptr<INode>> sink;
44 for (auto it = p->nodes_.rbegin(); it < p->nodes_.rend(); it++) {
45 if ((*it)->GetNumberOfInPorts() == 1 && (*it)->GetNumberOfOutPorts() == 0) {
46 sink.push_back(*it);
47 }
48 }
49
50 std::unordered_map<int, std::vector<std::shared_ptr<INode>>> seqNode;
51 for (const auto& it : sink) {
52 auto inPorts = it->GetInPorts();
53 if (!inPorts.empty()) {
54 // sink node has only one port, and it is a in-port
55 GenerateNodeSeq(seqNode[inPorts[0]->GetStreamId()], it);
56 }
57 }
58
59 for (auto& [id, pipe] : seqNode) {
60 CutUselessBranch(id, pipe);
61 }
62
63 std::swap(seqNode_, seqNode);
64 CAMERA_LOGI("------------------------Node Seq(UpStream) Dump Begin-------------\n");
65 for (auto [ss, vv] : seqNode_) {
66 CAMERA_LOGI("sink stream id:%{public}d \n", ss);
67 for (auto it : vv) {
68 it->wide_ = static_cast<int32_t>(p->maxWidth_);
69 it->high_ = static_cast<int32_t>(p->maxHeight_);
70 CAMERA_LOGI("seq node name:%{public}s, %{public}d * %{public}d\n",
71 it->GetName().c_str(), it->wide_, it->high_);
72 }
73 }
74 CAMERA_LOGI("------------------------Node Seq(UpStream) Dump End-------------\n");
75
76 SetDispatcherCallback();
77
78 return RC_OK;
79 }
80
SetDispatcherCallback()81 RetCode StreamPipelineDispatcher::SetDispatcherCallback()
82 {
83 RetCode ret = RC_OK;
84 for (auto iter = seqNode_.begin(); iter != seqNode_.end(); iter++) {
85 for (auto it = iter->second.rbegin(); it != iter->second.rend(); it++) {
86 ret = (*it)->SetCallback() | ret;
87 }
88 }
89 return ret;
90 }
91
CutUselessBranch(int32_t streamId,std::vector<std::shared_ptr<INode>> & branch)92 void StreamPipelineDispatcher::CutUselessBranch(int32_t streamId, std::vector<std::shared_ptr<INode>>& branch)
93 {
94 auto it = std::find_if(branch.begin(), branch.end(), [streamId](const std::shared_ptr<INode> node) {
95 auto ports = node->GetOutPorts();
96 bool isSameBranch = true;
97 for (auto port : ports) {
98 if (port->GetStreamId() == streamId) {
99 return false;
100 }
101 isSameBranch = false;
102 }
103 return !isSameBranch;
104 });
105 if (it == branch.end()) {
106 return;
107 }
108 branch.erase(it, branch.end());
109
110 return;
111 }
112
Prepare(const int32_t streamId)113 RetCode StreamPipelineDispatcher::Prepare(const int32_t streamId)
114 {
115 if (seqNode_.count(streamId) == 0) {
116 return RC_ERROR;
117 }
118
119 RetCode ret = RC_OK;
120 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
121 CAMERA_LOGV("init node %{public}s begin", (*it)->GetName().c_str());
122 ret = (*it)->Init(streamId) | ret;
123 CAMERA_LOGV("init node %{public}s end", (*it)->GetName().c_str());
124 }
125 return ret;
126 }
127
Start(const int32_t streamId)128 RetCode StreamPipelineDispatcher::Start(const int32_t streamId)
129 {
130 if (seqNode_.count(streamId) == 0) {
131 return RC_ERROR;
132 }
133
134 RetCode ret = RC_OK;
135 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
136 CAMERA_LOGV("start node %{public}s begin", (*it)->GetName().c_str());
137 ret = (*it)->Start(streamId) | ret;
138 CAMERA_LOGV("start node %{public}s end", (*it)->GetName().c_str());
139 }
140 return ret;
141 }
142
Config(const int32_t streamId,const CaptureMeta & meta)143 RetCode StreamPipelineDispatcher::Config(const int32_t streamId, const CaptureMeta& meta)
144 {
145 if (seqNode_.count(streamId) == 0) {
146 return RC_ERROR;
147 }
148
149 RetCode ret = RC_OK;
150 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
151 ret = (*it)->Config(streamId, meta) | ret;
152 }
153 return ret;
154 }
155
Flush(const int32_t streamId)156 RetCode StreamPipelineDispatcher::Flush(const int32_t streamId)
157 {
158 if (seqNode_.count(streamId) == 0) {
159 return RC_ERROR;
160 }
161
162 RetCode ret = RC_OK;
163 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
164 CAMERA_LOGV("flush node %{public}s begin", (*it)->GetName().c_str());
165 ret = (*it)->Flush(streamId) | ret;
166 CAMERA_LOGV("flush node %{public}s end", (*it)->GetName().c_str());
167 }
168 return ret;
169 }
170
SetCallback(const MetaDataCb cb)171 void StreamPipelineDispatcher::SetCallback(const MetaDataCb cb)
172 {
173 CAMERA_LOGI("StreamPipelineDispatcher line: %{public}d", __LINE__);
174 metaDataCb_ = cb;
175 }
176
Stop(const int32_t streamId)177 RetCode StreamPipelineDispatcher::Stop(const int32_t streamId)
178 {
179 if (seqNode_.count(streamId) == 0) {
180 return RC_OK;
181 }
182
183 RetCode ret = RC_OK;
184 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
185 CAMERA_LOGV("stop node %{public}s begin", (*it)->GetName().c_str());
186 ret = (*it)->Stop(streamId) | ret;
187 CAMERA_LOGV("stop node %{public}s end", (*it)->GetName().c_str());
188 }
189 return ret;
190 }
191
Capture(const int32_t streamId,const int32_t captureId)192 RetCode StreamPipelineDispatcher::Capture(const int32_t streamId, const int32_t captureId)
193 {
194 if (seqNode_.count(streamId) == 0) {
195 return RC_ERROR;
196 }
197
198 RetCode ret = RC_OK;
199 for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
200 ret = (*it)->Capture(streamId, captureId) | ret;
201 }
202
203 return ret;
204 }
205
CancelCapture(const int32_t streamId)206 RetCode StreamPipelineDispatcher::CancelCapture(const int32_t streamId)
207 {
208 if (seqNode_.count(streamId) == 0) {
209 return RC_ERROR;
210 }
211
212 RetCode ret = RC_OK;
213 for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
214 ret = (*it)->CancelCapture(streamId) | ret;
215 }
216
217 return ret;
218 }
219
Destroy(const int32_t streamId)220 RetCode StreamPipelineDispatcher::Destroy(const int32_t streamId)
221 {
222 auto it = seqNode_.find(streamId);
223 if (it == seqNode_.end()) {
224 CAMERA_LOGV("pipeline for stream [id:%{public}d] doesn't exists, no need to destroy.", streamId);
225 return RC_OK;
226 }
227 seqNode_.erase(streamId);
228
229 return RC_OK;
230 }
231
GetNode(const int32_t streamId,const std::string name)232 std::shared_ptr<INode> StreamPipelineDispatcher::GetNode(const int32_t streamId, const std::string name)
233 {
234 if (seqNode_.count(streamId) == 0) {
235 return nullptr;
236 }
237
238 std::shared_ptr<INode> node = nullptr;
239 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
240 if (name == (*it)->GetName().substr(0, 3)) { // 3:Copy length
241 node = *it;
242 }
243 }
244 return node;
245 }
246 }
247