1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 * http://www.apache.org/licenses/LICENSE-2.0
7 * Unless required by applicable law or agreed to in writing, software
8 * distributed under the License is distributed on an "AS IS" BASIS,
9 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 * See the License for the specific language governing permissions and
11 * limitations under the License.
12 */
13
14 #include "stream_pipeline_dispatcher.h"
15
16 namespace OHOS::Camera {
17
Create()18 std::unique_ptr<StreamPipelineDispatcher> StreamPipelineDispatcher::Create()
19 {
20 return std::make_unique<StreamPipelineDispatcher>();
21 }
22
GenerateNodeSeq(std::vector<std::shared_ptr<INode>> & nodeVec,const std::shared_ptr<INode> & node)23 void StreamPipelineDispatcher::GenerateNodeSeq(std::vector<std::shared_ptr<INode>>& nodeVec,
24 const std::shared_ptr<INode>& node)
25 {
26 if (node != nullptr) {
27 nodeVec.push_back(node);
28 } else {
29 return;
30 }
31
32 if (node->GetNumberOfInPorts() == 0) {
33 return;
34 }
35
36 for (const auto& it : node->GetInPorts()) {
37 GenerateNodeSeq(nodeVec, it->Peer()->GetNode());
38 }
39 }
40
Update(const std::shared_ptr<Pipeline> & p)41 RetCode StreamPipelineDispatcher::Update(const std::shared_ptr<Pipeline>& p)
42 {
43 std::vector<std::shared_ptr<INode>> sink;
44 for (auto it = p->nodes_.rbegin(); it < p->nodes_.rend(); it++) {
45 if ((*it)->GetNumberOfInPorts() == 1 && (*it)->GetNumberOfOutPorts() == 0) {
46 sink.push_back(*it);
47 }
48 }
49
50 std::unordered_map<int, std::vector<std::shared_ptr<INode>>> seqNode;
51 for (const auto& it : sink) {
52 auto inPorts = it->GetInPorts();
53 if (!inPorts.empty()) {
54 // sink node has only one port, and it is a in-port
55 GenerateNodeSeq(seqNode[inPorts[0]->GetStreamId()], it);
56 }
57 }
58
59 for (auto& [id, pipe] : seqNode) {
60 CutUselessBranch(id, pipe);
61 }
62
63 std::swap(seqNode_, seqNode);
64 CAMERA_LOGI("------------------------Node Seq(UpStream) Dump Begin-------------\n");
65 for (auto [ss, vv] : seqNode_) {
66 CAMERA_LOGI("sink stream id:%{public}d \n", ss);
67 for (auto it : vv) {
68 CAMERA_LOGI("seq node name:%{public}s\n", it->GetName().c_str());
69 }
70 }
71 CAMERA_LOGI("------------------------Node Seq(UpStream) Dump End-------------\n");
72 return RC_OK;
73 }
74
CutUselessBranch(int32_t streamId,std::vector<std::shared_ptr<INode>> & branch)75 void StreamPipelineDispatcher::CutUselessBranch(int32_t streamId, std::vector<std::shared_ptr<INode>>& branch)
76 {
77 auto it = std::find_if(branch.begin(), branch.end(), [streamId](const std::shared_ptr<INode> node) {
78 auto ports = node->GetOutPorts();
79 bool isSameBranch = true;
80 for (auto port : ports) {
81 if (port->GetStreamId() == streamId) {
82 return false;
83 }
84 isSameBranch = false;
85 }
86 return !isSameBranch;
87 });
88 if (it == branch.end()) {
89 return;
90 }
91 branch.erase(it, branch.end());
92
93 return;
94 }
95
Prepare(const int32_t streamId)96 RetCode StreamPipelineDispatcher::Prepare(const int32_t streamId)
97 {
98 if (seqNode_.count(streamId) == 0) {
99 return RC_ERROR;
100 }
101
102 RetCode re = RC_OK;
103 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
104 CAMERA_LOGV("init node %{public}s begin", (*it)->GetName().c_str());
105 re = (*it)->Init(streamId) | re;
106 CAMERA_LOGV("init node %{public}s end", (*it)->GetName().c_str());
107 }
108 return re;
109 }
110
Start(const int32_t streamId)111 RetCode StreamPipelineDispatcher::Start(const int32_t streamId)
112 {
113 if (seqNode_.count(streamId) == 0) {
114 return RC_ERROR;
115 }
116
117 RetCode re = RC_OK;
118 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
119 CAMERA_LOGV("start node %{public}s begin", (*it)->GetName().c_str());
120 re = (*it)->Start(streamId) | re;
121 CAMERA_LOGV("start node %{public}s end", (*it)->GetName().c_str());
122 }
123 return re;
124 }
125
Config(const int32_t streamId,const CaptureMeta & meta)126 RetCode StreamPipelineDispatcher::Config(const int32_t streamId, const CaptureMeta& meta)
127 {
128 if (seqNode_.count(streamId) == 0) {
129 return RC_ERROR;
130 }
131
132 RetCode re = RC_OK;
133 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
134 re = (*it)->Config(streamId, meta) | re;
135 }
136 return re;
137 }
138
Flush(const int32_t streamId)139 RetCode StreamPipelineDispatcher::Flush(const int32_t streamId)
140 {
141 if (seqNode_.count(streamId) == 0) {
142 return RC_ERROR;
143 }
144
145 RetCode re = RC_OK;
146 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
147 CAMERA_LOGV("flush node %{public}s begin", (*it)->GetName().c_str());
148 re = (*it)->Flush(streamId) | re;
149 CAMERA_LOGV("flush node %{public}s end", (*it)->GetName().c_str());
150 }
151 return re;
152 }
153
Stop(const int32_t streamId)154 RetCode StreamPipelineDispatcher::Stop(const int32_t streamId)
155 {
156 if (seqNode_.count(streamId) == 0) {
157 return RC_OK;
158 }
159
160 RetCode re = RC_OK;
161 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
162 CAMERA_LOGV("stop node %{public}s begin", (*it)->GetName().c_str());
163 re = (*it)->Stop(streamId) | re;
164 CAMERA_LOGV("stop node %{public}s end", (*it)->GetName().c_str());
165 }
166 return re;
167 }
168
Capture(const int32_t streamId,const int32_t captureId)169 RetCode StreamPipelineDispatcher::Capture(const int32_t streamId, const int32_t captureId)
170 {
171 if (seqNode_.count(streamId) == 0) {
172 return RC_ERROR;
173 }
174
175 RetCode re = RC_OK;
176 for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
177 re = (*it)->Capture(streamId, captureId) | re;
178 }
179
180 return re;
181 }
182
CancelCapture(const int32_t streamId)183 RetCode StreamPipelineDispatcher::CancelCapture(const int32_t streamId)
184 {
185 if (seqNode_.count(streamId) == 0) {
186 return RC_ERROR;
187 }
188
189 RetCode re = RC_OK;
190 for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
191 re = (*it)->CancelCapture(streamId) | re;
192 }
193
194 return re;
195 }
196
Destroy(const int32_t streamId)197 RetCode StreamPipelineDispatcher::Destroy(const int32_t streamId)
198 {
199 auto it = seqNode_.find(streamId);
200 if (it == seqNode_.end()) {
201 CAMERA_LOGV("pipeline for stream [id:%{public}d] doesn't exists, no need to destroy.", streamId);
202 return RC_OK;
203 }
204 seqNode_.erase(streamId);
205
206 return RC_OK;
207 }
208
GetNode(const int32_t streamId,const std::string name)209 std::shared_ptr<INode> StreamPipelineDispatcher::GetNode(const int32_t streamId, const std::string name)
210 {
211 if (seqNode_.count(streamId) == 0) {
212 return nullptr;
213 }
214
215 std::shared_ptr<INode> node = nullptr;
216 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
217 if (name == (*it)->GetName().substr(0, 3)) { // 3:Copy length
218 node = *it;
219 }
220 }
221 return node;
222 }
223 }
224