• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *     http://www.apache.org/licenses/LICENSE-2.0
7  * Unless required by applicable law or agreed to in writing, software
8  * distributed under the License is distributed on an "AS IS" BASIS,
9  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10  * See the License for the specific language governing permissions and
11  * limitations under the License.
12  */
13 
14 #include "stream_pipeline_dispatcher.h"
15 
16 namespace OHOS::Camera {
17 
Create()18 std::unique_ptr<StreamPipelineDispatcher> StreamPipelineDispatcher::Create()
19 {
20     return std::make_unique<StreamPipelineDispatcher>();
21 }
22 
GenerateNodeSeq(std::vector<std::shared_ptr<INode>> & nodeVec,const std::shared_ptr<INode> & node)23 void StreamPipelineDispatcher::GenerateNodeSeq(std::vector<std::shared_ptr<INode>>& nodeVec,
24     const std::shared_ptr<INode>& node)
25 {
26     if (node != nullptr) {
27         nodeVec.push_back(node);
28     } else {
29         return;
30     }
31 
32     if (node->GetNumberOfInPorts() == 0) {
33         return;
34     }
35 
36     for (const auto& it : node->GetInPorts()) {
37         GenerateNodeSeq(nodeVec, it->Peer()->GetNode());
38     }
39 }
40 
Update(const std::shared_ptr<Pipeline> & p)41 RetCode StreamPipelineDispatcher::Update(const std::shared_ptr<Pipeline>& p)
42 {
43     std::vector<std::shared_ptr<INode>> sink;
44     for (auto it = p->nodes_.rbegin(); it < p->nodes_.rend(); it++) {
45         if ((*it)->GetNumberOfInPorts() == 1 && (*it)->GetNumberOfOutPorts() == 0) {
46             sink.push_back(*it);
47         }
48     }
49 
50     std::unordered_map<int, std::vector<std::shared_ptr<INode>>> seqNode;
51     for (const auto& it : sink) {
52         auto inPorts = it->GetInPorts();
53         if (!inPorts.empty()) {
54             // sink node has only one port, and it is a in-port
55             GenerateNodeSeq(seqNode[inPorts[0]->GetStreamId()], it);
56         }
57     }
58 
59     for (auto& [id, pipe] : seqNode) {
60         CutUselessBranch(id, pipe);
61     }
62 
63     std::swap(seqNode_, seqNode);
64     CAMERA_LOGI("------------------------Node Seq(UpStream) Dump Begin-------------\n");
65     for (auto [ss, vv] : seqNode_) {
66         CAMERA_LOGI("sink stream id:%{public}d \n", ss);
67         for (auto it : vv) {
68             it->wide_ = p->wide_;
69             it->high_ = p->high_;
70             CAMERA_LOGI("seq node name:%{public}s\n", it->GetName().c_str());
71         }
72     }
73     CAMERA_LOGI("------------------------Node Seq(UpStream) Dump End-------------\n");
74 
75     SetDispatcherCallback();
76 
77     return RC_OK;
78 }
79 
SetDispatcherCallback()80 RetCode StreamPipelineDispatcher::SetDispatcherCallback()
81 {
82     RetCode ret = RC_OK;
83     for (auto iter = seqNode_.begin(); iter != seqNode_.end(); iter++) {
84         for (auto it = iter->second.rbegin(); it != iter->second.rend(); it++) {
85             ret = (*it)->SetCallback() | ret;
86         }
87     }
88     return ret;
89 }
90 
CutUselessBranch(int32_t streamId,std::vector<std::shared_ptr<INode>> & branch)91 void StreamPipelineDispatcher::CutUselessBranch(int32_t streamId, std::vector<std::shared_ptr<INode>>& branch)
92 {
93     auto it = std::find_if(branch.begin(), branch.end(), [streamId](const std::shared_ptr<INode> node) {
94         auto ports = node->GetOutPorts();
95         bool isSameBranch = true;
96         for (auto port : ports) {
97             if (port->GetStreamId() == streamId) {
98                 return false;
99             }
100             isSameBranch = false;
101         }
102         return !isSameBranch;
103     });
104     if (it == branch.end()) {
105         return;
106     }
107     branch.erase(it, branch.end());
108 
109     return;
110 }
111 
Prepare(const int32_t streamId)112 RetCode StreamPipelineDispatcher::Prepare(const int32_t streamId)
113 {
114     if (seqNode_.count(streamId) == 0) {
115         return RC_ERROR;
116     }
117 
118     RetCode ret = RC_OK;
119     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
120         CAMERA_LOGV("init node %{public}s begin", (*it)->GetName().c_str());
121         ret = (*it)->Init(streamId) | ret;
122         CAMERA_LOGV("init node %{public}s end", (*it)->GetName().c_str());
123     }
124     return ret;
125 }
126 
Start(const int32_t streamId)127 RetCode StreamPipelineDispatcher::Start(const int32_t streamId)
128 {
129     if (seqNode_.count(streamId) == 0) {
130         return RC_ERROR;
131     }
132 
133     RetCode ret = RC_OK;
134     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
135         CAMERA_LOGV("start node %{public}s begin", (*it)->GetName().c_str());
136         ret = (*it)->Start(streamId) | ret;
137         CAMERA_LOGV("start node %{public}s end", (*it)->GetName().c_str());
138     }
139     return ret;
140 }
141 
Config(const int32_t streamId,const CaptureMeta & meta)142 RetCode StreamPipelineDispatcher::Config(const int32_t streamId, const CaptureMeta& meta)
143 {
144     if (seqNode_.count(streamId) == 0) {
145         return RC_ERROR;
146     }
147 
148     RetCode ret = RC_OK;
149     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
150         ret = (*it)->Config(streamId, meta) | ret;
151     }
152     return ret;
153 }
154 
Flush(const int32_t streamId)155 RetCode StreamPipelineDispatcher::Flush(const int32_t streamId)
156 {
157     if (seqNode_.count(streamId) == 0) {
158         return RC_ERROR;
159     }
160 
161     RetCode ret = RC_OK;
162     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
163         CAMERA_LOGV("flush node %{public}s begin", (*it)->GetName().c_str());
164         ret = (*it)->Flush(streamId) | ret;
165         CAMERA_LOGV("flush node %{public}s end", (*it)->GetName().c_str());
166     }
167     return ret;
168 }
169 
SetCallback(const MetaDataCb cb)170 void StreamPipelineDispatcher::SetCallback(const MetaDataCb cb)
171 {
172     CAMERA_LOGI("StreamPipelineDispatcher line: %{public}d", __LINE__);
173     metaDataCb_ = cb;
174 }
175 
Stop(const int32_t streamId)176 RetCode StreamPipelineDispatcher::Stop(const int32_t streamId)
177 {
178     if (seqNode_.count(streamId) == 0) {
179         return RC_OK;
180     }
181 
182     RetCode ret = RC_OK;
183     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
184         CAMERA_LOGV("stop node %{public}s begin", (*it)->GetName().c_str());
185         ret = (*it)->Stop(streamId) | ret;
186         CAMERA_LOGV("stop node %{public}s end", (*it)->GetName().c_str());
187     }
188     return ret;
189 }
190 
Capture(const int32_t streamId,const int32_t captureId)191 RetCode StreamPipelineDispatcher::Capture(const int32_t streamId, const int32_t captureId)
192 {
193     if (seqNode_.count(streamId) == 0) {
194         return RC_ERROR;
195     }
196 
197     RetCode ret = RC_OK;
198     for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
199         ret = (*it)->Capture(streamId, captureId) | ret;
200     }
201 
202     return ret;
203 }
204 
CancelCapture(const int32_t streamId)205 RetCode StreamPipelineDispatcher::CancelCapture(const int32_t streamId)
206 {
207     if (seqNode_.count(streamId) == 0) {
208         return RC_ERROR;
209     }
210 
211     RetCode ret = RC_OK;
212     for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
213         ret = (*it)->CancelCapture(streamId) | ret;
214     }
215 
216     return ret;
217 }
218 
Destroy(const int32_t streamId)219 RetCode StreamPipelineDispatcher::Destroy(const int32_t streamId)
220 {
221     auto it = seqNode_.find(streamId);
222     if (it == seqNode_.end()) {
223         CAMERA_LOGV("pipeline for stream [id:%{public}d] doesn't exists, no need to destroy.", streamId);
224         return RC_OK;
225     }
226     seqNode_.erase(streamId);
227 
228     return RC_OK;
229 }
230 
GetNode(const int32_t streamId,const std::string name)231 std::shared_ptr<INode> StreamPipelineDispatcher::GetNode(const int32_t streamId, const std::string name)
232 {
233     if (seqNode_.count(streamId) == 0) {
234         return nullptr;
235     }
236 
237     std::shared_ptr<INode> node = nullptr;
238     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
239         if (name == (*it)->GetName().substr(0, 3)) {  // 3:Copy length
240             node = *it;
241         }
242     }
243     return node;
244 }
245 }
246