• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *     http://www.apache.org/licenses/LICENSE-2.0
7  * Unless required by applicable law or agreed to in writing, software
8  * distributed under the License is distributed on an "AS IS" BASIS,
9  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10  * See the License for the specific language governing permissions and
11  * limitations under the License.
12  */
13 
14 #include "stream_pipeline_dispatcher.h"
15 
16 namespace OHOS::Camera {
17 
Create()18 std::unique_ptr<StreamPipelineDispatcher> StreamPipelineDispatcher::Create()
19 {
20     return std::make_unique<StreamPipelineDispatcher>();
21 }
22 
GenerateNodeSeq(std::vector<std::shared_ptr<INode>> & nodeVec,const std::shared_ptr<INode> & node)23 void StreamPipelineDispatcher::GenerateNodeSeq(std::vector<std::shared_ptr<INode>>& nodeVec,
24             const std::shared_ptr<INode>& node)
25 {
26     if (node != nullptr) {
27         nodeVec.push_back(node);
28     } else {
29         return;
30     }
31 
32     if (node->GetNumberOfInPorts() == 0) {
33         return;
34     }
35 
36     for (const auto& it : node->GetInPorts()) {
37         GenerateNodeSeq(nodeVec, it->Peer()->GetNode());
38     }
39 }
40 
Update(const std::shared_ptr<Pipeline> & p)41 RetCode StreamPipelineDispatcher::Update(const std::shared_ptr<Pipeline>& p)
42 {
43     std::vector<std::shared_ptr<INode>> sink;
44     for (auto it = p->nodes_.rbegin(); it < p->nodes_.rend(); it++) {
45         if ((*it)->GetNumberOfInPorts() == 1 && (*it)->GetNumberOfOutPorts() == 0) {
46             sink.push_back(*it);
47         }
48     }
49 
50     std::unordered_map<int, std::vector<std::shared_ptr<INode>>> seqNode;
51     for (const auto& it : sink) {
52         auto inPorts = it->GetInPorts();
53         if (!inPorts.empty()) {
54             // sink node has only one port, and it is a in-port
55             GenerateNodeSeq(seqNode[inPorts[0]->GetStreamId()], it);
56         }
57     }
58 
59     for (auto& [id, pipe] : seqNode) {
60         CutUselessBranch(id, pipe);
61     }
62 
63     std::swap(seqNode_, seqNode);
64     CAMERA_LOGI("------------------------Node Seq(UpStream) Dump Begin-------------\n");
65     for (auto [ss, vv] : seqNode_) {
66         CAMERA_LOGI("sink stream id:%{public}d \n", ss);
67         for (auto it : vv) {
68             CAMERA_LOGI("seq node name:%{public}s\n", it->GetName().c_str());
69         }
70     }
71     CAMERA_LOGI("------------------------Node Seq(UpStream) Dump End-------------\n");
72     return RC_OK;
73 }
74 
CutUselessBranch(int32_t streamId,std::vector<std::shared_ptr<INode>> & branch)75 void StreamPipelineDispatcher::CutUselessBranch(int32_t streamId, std::vector<std::shared_ptr<INode>>& branch)
76 {
77     auto it = std::find_if(branch.begin(), branch.end(), [streamId](const std::shared_ptr<INode> node) {
78         auto ports = node->GetOutPorts();
79         bool isSameBranch = true;
80         for (auto port : ports) {
81             if (port->GetStreamId() == streamId) {
82                 return false;
83             }
84             isSameBranch = false;
85         }
86         return !isSameBranch;
87     });
88     if (it == branch.end()) {
89         return;
90     }
91     branch.erase(it, branch.end());
92 
93     return;
94 }
95 
Prepare(const int32_t streamId)96 RetCode StreamPipelineDispatcher::Prepare(const int32_t streamId)
97 {
98     if (seqNode_.count(streamId) == 0) {
99         return RC_ERROR;
100     }
101 
102     RetCode re = RC_OK;
103     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
104         CAMERA_LOGV("init node %{public}s begin", (*it)->GetName().c_str());
105         re = (*it)->Init(streamId) | re;
106         CAMERA_LOGV("init node %{public}s end", (*it)->GetName().c_str());
107     }
108     return re;
109 }
110 
Start(const int32_t streamId)111 RetCode StreamPipelineDispatcher::Start(const int32_t streamId)
112 {
113     if (seqNode_.count(streamId) == 0) {
114         return RC_ERROR;
115     }
116 
117     RetCode re = RC_OK;
118     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
119         CAMERA_LOGV("start node %{public}s begin", (*it)->GetName().c_str());
120         re = (*it)->Start(streamId) | re;
121         CAMERA_LOGV("start node %{public}s end", (*it)->GetName().c_str());
122     }
123     return re;
124 }
125 
Config(const int32_t streamId,const CaptureMeta & meta)126 RetCode StreamPipelineDispatcher::Config(const int32_t streamId, const CaptureMeta& meta)
127 {
128     if (seqNode_.count(streamId) == 0) {
129         return RC_ERROR;
130     }
131 
132     RetCode re = RC_OK;
133     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
134         re = (*it)->Config(streamId, meta) | re;
135     }
136     return re;
137 }
138 
Flush(const int32_t streamId)139 RetCode StreamPipelineDispatcher::Flush(const int32_t streamId)
140 {
141     if (seqNode_.count(streamId) == 0) {
142         return RC_ERROR;
143     }
144 
145     RetCode re = RC_OK;
146     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
147         CAMERA_LOGV("flush node %{public}s begin", (*it)->GetName().c_str());
148         re = (*it)->Flush(streamId) | re;
149         CAMERA_LOGV("flush node %{public}s end", (*it)->GetName().c_str());
150     }
151     return re;
152 }
153 
Stop(const int32_t streamId)154 RetCode StreamPipelineDispatcher::Stop(const int32_t streamId)
155 {
156     if (seqNode_.count(streamId) == 0) {
157         return RC_OK;
158     }
159 
160     RetCode re = RC_OK;
161     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
162         CAMERA_LOGV("stop node %{public}s begin", (*it)->GetName().c_str());
163         re = (*it)->Stop(streamId) | re;
164         CAMERA_LOGV("stop node %{public}s end", (*it)->GetName().c_str());
165     }
166     return re;
167 }
168 
Capture(const int32_t streamId,const int32_t captureId)169 RetCode StreamPipelineDispatcher::Capture(const int32_t streamId, const int32_t captureId)
170 {
171     if (seqNode_.count(streamId) == 0) {
172         return RC_ERROR;
173     }
174 
175     RetCode re = RC_OK;
176     for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
177         re = (*it)->Capture(streamId, captureId) | re;
178     }
179 
180     return re;
181 }
182 
CancelCapture(const int32_t streamId)183 RetCode StreamPipelineDispatcher::CancelCapture(const int32_t streamId)
184 {
185     if (seqNode_.count(streamId) == 0) {
186         return RC_ERROR;
187     }
188 
189     RetCode re = RC_OK;
190     for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
191         re = (*it)->CancelCapture(streamId) | re;
192     }
193 
194     return re;
195 }
196 
Destroy(const int32_t streamId)197 RetCode StreamPipelineDispatcher::Destroy(const int32_t streamId)
198 {
199     auto it = seqNode_.find(streamId);
200     if (it == seqNode_.end()) {
201         CAMERA_LOGV("pipeline for stream [id:%{public}d] doesn't exists, no need to destroy.", streamId);
202         return RC_OK;
203     }
204     seqNode_.erase(streamId);
205 
206     return RC_OK;
207 }
208 
GetNode(const int32_t streamId,const std::string name)209 std::shared_ptr<INode> StreamPipelineDispatcher::GetNode(const int32_t streamId, const std::string name)
210 {
211     if (seqNode_.count(streamId) == 0) {
212         return nullptr;
213     }
214 
215     std::shared_ptr<INode> node = nullptr;
216     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
217         if (name == (*it)->GetName().substr(0, 3)) {  // 3:Copy length
218             node = *it;
219         }
220     }
221     return node;
222 }
223 }
224