1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 * http://www.apache.org/licenses/LICENSE-2.0
7 * Unless required by applicable law or agreed to in writing, software
8 * distributed under the License is distributed on an "AS IS" BASIS,
9 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10 * See the License for the specific language governing permissions and
11 * limitations under the License.
12 */
13
14 #include "stream_pipeline_dispatcher.h"
15
16 namespace OHOS::Camera {
17
Create()18 std::unique_ptr<StreamPipelineDispatcher> StreamPipelineDispatcher::Create()
19 {
20 return std::make_unique<StreamPipelineDispatcher>();
21 }
22
GenerateNodeSeq(std::vector<std::shared_ptr<INode>> & nodeVec,const std::shared_ptr<INode> & node)23 void StreamPipelineDispatcher::GenerateNodeSeq(std::vector<std::shared_ptr<INode>>& nodeVec,
24 const std::shared_ptr<INode>& node)
25 {
26 if (node != nullptr) {
27 nodeVec.push_back(node);
28 } else {
29 return;
30 }
31
32 if (node->GetNumberOfInPorts() == 0) {
33 return;
34 }
35
36 for (const auto& it : node->GetInPorts()) {
37 GenerateNodeSeq(nodeVec, it->Peer()->GetNode());
38 }
39 }
40
Update(const std::shared_ptr<Pipeline> & p)41 RetCode StreamPipelineDispatcher::Update(const std::shared_ptr<Pipeline>& p)
42 {
43 std::vector<std::shared_ptr<INode>> sink;
44 for (auto it = p->nodes_.rbegin(); it < p->nodes_.rend(); it++) {
45 if ((*it)->GetNumberOfInPorts() == 1 && (*it)->GetNumberOfOutPorts() == 0) {
46 sink.push_back(*it);
47 }
48 }
49
50 std::unordered_map<int, std::vector<std::shared_ptr<INode>>> seqNode;
51 for (const auto& it : sink) {
52 auto inPorts = it->GetInPorts();
53 if (!inPorts.empty()) {
54 // sink node has only one port, and it is a in-port
55 GenerateNodeSeq(seqNode[inPorts[0]->GetStreamId()], it);
56 }
57 }
58
59 for (auto& [id, pipe] : seqNode) {
60 CutUselessBranch(id, pipe);
61 }
62
63 std::swap(seqNode_, seqNode);
64 CAMERA_LOGI("------------------------Node Seq(UpStream) Dump Begin-------------\n");
65 for (auto [ss, vv] : seqNode_) {
66 CAMERA_LOGI("sink stream id:%{public}d \n", ss);
67 for (auto it : vv) {
68 it->wide_ = p->wide_;
69 it->high_ = p->high_;
70 CAMERA_LOGI("seq node name:%{public}s\n", it->GetName().c_str());
71 }
72 }
73 CAMERA_LOGI("------------------------Node Seq(UpStream) Dump End-------------\n");
74
75 SetDispatcherCallback();
76
77 return RC_OK;
78 }
79
SetDispatcherCallback()80 RetCode StreamPipelineDispatcher::SetDispatcherCallback()
81 {
82 RetCode ret = RC_OK;
83 for (auto iter = seqNode_.begin(); iter != seqNode_.end(); iter++) {
84 for (auto it = iter->second.rbegin(); it != iter->second.rend(); it++) {
85 ret = (*it)->SetCallback() | ret;
86 }
87 }
88 return ret;
89 }
90
CutUselessBranch(int32_t streamId,std::vector<std::shared_ptr<INode>> & branch)91 void StreamPipelineDispatcher::CutUselessBranch(int32_t streamId, std::vector<std::shared_ptr<INode>>& branch)
92 {
93 auto it = std::find_if(branch.begin(), branch.end(), [streamId](const std::shared_ptr<INode> node) {
94 auto ports = node->GetOutPorts();
95 bool isSameBranch = true;
96 for (auto port : ports) {
97 if (port->GetStreamId() == streamId) {
98 return false;
99 }
100 isSameBranch = false;
101 }
102 return !isSameBranch;
103 });
104 if (it == branch.end()) {
105 return;
106 }
107 branch.erase(it, branch.end());
108
109 return;
110 }
111
Prepare(const int32_t streamId)112 RetCode StreamPipelineDispatcher::Prepare(const int32_t streamId)
113 {
114 if (seqNode_.count(streamId) == 0) {
115 return RC_ERROR;
116 }
117
118 RetCode ret = RC_OK;
119 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
120 CAMERA_LOGV("init node %{public}s begin", (*it)->GetName().c_str());
121 ret = (*it)->Init(streamId) | ret;
122 CAMERA_LOGV("init node %{public}s end", (*it)->GetName().c_str());
123 }
124 return ret;
125 }
126
Start(const int32_t streamId)127 RetCode StreamPipelineDispatcher::Start(const int32_t streamId)
128 {
129 if (seqNode_.count(streamId) == 0) {
130 return RC_ERROR;
131 }
132
133 RetCode ret = RC_OK;
134 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
135 CAMERA_LOGV("start node %{public}s begin", (*it)->GetName().c_str());
136 ret = (*it)->Start(streamId) | ret;
137 CAMERA_LOGV("start node %{public}s end", (*it)->GetName().c_str());
138 }
139 return ret;
140 }
141
Config(const int32_t streamId,const CaptureMeta & meta)142 RetCode StreamPipelineDispatcher::Config(const int32_t streamId, const CaptureMeta& meta)
143 {
144 if (seqNode_.count(streamId) == 0) {
145 return RC_ERROR;
146 }
147
148 RetCode ret = RC_OK;
149 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
150 ret = (*it)->Config(streamId, meta) | ret;
151 }
152 return ret;
153 }
154
Flush(const int32_t streamId)155 RetCode StreamPipelineDispatcher::Flush(const int32_t streamId)
156 {
157 if (seqNode_.count(streamId) == 0) {
158 return RC_ERROR;
159 }
160
161 RetCode ret = RC_OK;
162 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
163 CAMERA_LOGV("flush node %{public}s begin", (*it)->GetName().c_str());
164 ret = (*it)->Flush(streamId) | ret;
165 CAMERA_LOGV("flush node %{public}s end", (*it)->GetName().c_str());
166 }
167 return ret;
168 }
169
SetCallback(const MetaDataCb cb)170 void StreamPipelineDispatcher::SetCallback(const MetaDataCb cb)
171 {
172 CAMERA_LOGI("StreamPipelineDispatcher line: %{public}d", __LINE__);
173 metaDataCb_ = cb;
174 }
175
Stop(const int32_t streamId)176 RetCode StreamPipelineDispatcher::Stop(const int32_t streamId)
177 {
178 if (seqNode_.count(streamId) == 0) {
179 return RC_OK;
180 }
181
182 RetCode ret = RC_OK;
183 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
184 CAMERA_LOGV("stop node %{public}s begin", (*it)->GetName().c_str());
185 ret = (*it)->Stop(streamId) | ret;
186 CAMERA_LOGV("stop node %{public}s end", (*it)->GetName().c_str());
187 }
188 return ret;
189 }
190
Capture(const int32_t streamId,const int32_t captureId)191 RetCode StreamPipelineDispatcher::Capture(const int32_t streamId, const int32_t captureId)
192 {
193 if (seqNode_.count(streamId) == 0) {
194 return RC_ERROR;
195 }
196
197 RetCode ret = RC_OK;
198 for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
199 ret = (*it)->Capture(streamId, captureId) | ret;
200 }
201
202 return ret;
203 }
204
CancelCapture(const int32_t streamId)205 RetCode StreamPipelineDispatcher::CancelCapture(const int32_t streamId)
206 {
207 if (seqNode_.count(streamId) == 0) {
208 return RC_ERROR;
209 }
210
211 RetCode ret = RC_OK;
212 for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
213 ret = (*it)->CancelCapture(streamId) | ret;
214 }
215
216 return ret;
217 }
218
Destroy(const int32_t streamId)219 RetCode StreamPipelineDispatcher::Destroy(const int32_t streamId)
220 {
221 auto it = seqNode_.find(streamId);
222 if (it == seqNode_.end()) {
223 CAMERA_LOGV("pipeline for stream [id:%{public}d] doesn't exists, no need to destroy.", streamId);
224 return RC_OK;
225 }
226 seqNode_.erase(streamId);
227
228 return RC_OK;
229 }
230
GetNode(const int32_t streamId,const std::string name)231 std::shared_ptr<INode> StreamPipelineDispatcher::GetNode(const int32_t streamId, const std::string name)
232 {
233 if (seqNode_.count(streamId) == 0) {
234 return nullptr;
235 }
236
237 std::shared_ptr<INode> node = nullptr;
238 for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
239 if (name == (*it)->GetName().substr(0, 3)) { // 3:Copy length
240 node = *it;
241 }
242 }
243 return node;
244 }
245 }
246