• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *     http://www.apache.org/licenses/LICENSE-2.0
7  * Unless required by applicable law or agreed to in writing, software
8  * distributed under the License is distributed on an "AS IS" BASIS,
9  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10  * See the License for the specific language governing permissions and
11  * limitations under the License.
12  */
13 
14 #include "stream_pipeline_dispatcher.h"
15 
16 namespace OHOS::Camera {
17 
Create()18 std::unique_ptr<StreamPipelineDispatcher> StreamPipelineDispatcher::Create()
19 {
20     return std::make_unique<StreamPipelineDispatcher>();
21 }
22 
GenerateNodeSeq(std::vector<std::shared_ptr<INode>> & nodeVec,const std::shared_ptr<INode> & node)23 void StreamPipelineDispatcher::GenerateNodeSeq(std::vector<std::shared_ptr<INode>>& nodeVec,
24     const std::shared_ptr<INode>& node)
25 {
26     if (node != nullptr) {
27         nodeVec.push_back(node);
28     } else {
29         return;
30     }
31 
32     if (node->GetNumberOfInPorts() == 0) {
33         return;
34     }
35 
36     for (const auto& it : node->GetInPorts()) {
37         GenerateNodeSeq(nodeVec, it->Peer()->GetNode());
38     }
39 }
40 
Update(const std::shared_ptr<Pipeline> & p)41 RetCode StreamPipelineDispatcher::Update(const std::shared_ptr<Pipeline>& p)
42 {
43     std::vector<std::shared_ptr<INode>> sink;
44     for (auto it = p->nodes_.rbegin(); it < p->nodes_.rend(); it++) {
45         if ((*it)->GetNumberOfInPorts() == 1 && (*it)->GetNumberOfOutPorts() == 0) {
46             sink.push_back(*it);
47         }
48     }
49 
50     std::unordered_map<int, std::vector<std::shared_ptr<INode>>> seqNode;
51     for (const auto& it : sink) {
52         auto inPorts = it->GetInPorts();
53         if (!inPorts.empty()) {
54             // sink node has only one port, and it is a in-port
55             GenerateNodeSeq(seqNode[inPorts[0]->GetStreamId()], it);
56         }
57     }
58 
59     for (auto& [id, pipe] : seqNode) {
60         CutUselessBranch(id, pipe);
61     }
62 
63     std::swap(seqNode_, seqNode);
64     CAMERA_LOGI("------------------------Node Seq(UpStream) Dump Begin-------------\n");
65     for (auto [ss, vv] : seqNode_) {
66         CAMERA_LOGI("sink stream id:%{public}d \n", ss);
67         for (auto it : vv) {
68             it->wide_ = static_cast<int32_t>(p->maxWidth_);
69             it->high_ = static_cast<int32_t>(p->maxHeight_);
70             CAMERA_LOGI("seq node name:%{public}s, %{public}d * %{public}d\n",
71                 it->GetName().c_str(), it->wide_, it->high_);
72         }
73     }
74     CAMERA_LOGI("------------------------Node Seq(UpStream) Dump End-------------\n");
75 
76     SetDispatcherCallback();
77 
78     return RC_OK;
79 }
80 
SetDispatcherCallback()81 RetCode StreamPipelineDispatcher::SetDispatcherCallback()
82 {
83     RetCode ret = RC_OK;
84     for (auto iter = seqNode_.begin(); iter != seqNode_.end(); iter++) {
85         for (auto it = iter->second.rbegin(); it != iter->second.rend(); it++) {
86             ret = (*it)->SetCallback() | ret;
87         }
88     }
89     return ret;
90 }
91 
CutUselessBranch(int32_t streamId,std::vector<std::shared_ptr<INode>> & branch)92 void StreamPipelineDispatcher::CutUselessBranch(int32_t streamId, std::vector<std::shared_ptr<INode>>& branch)
93 {
94     auto it = std::find_if(branch.begin(), branch.end(), [streamId](const std::shared_ptr<INode> node) {
95         auto ports = node->GetOutPorts();
96         bool isSameBranch = true;
97         for (auto port : ports) {
98             if (port->GetStreamId() == streamId) {
99                 return false;
100             }
101             isSameBranch = false;
102         }
103         return !isSameBranch;
104     });
105     if (it == branch.end()) {
106         return;
107     }
108     branch.erase(it, branch.end());
109 
110     return;
111 }
112 
Prepare(const int32_t streamId)113 RetCode StreamPipelineDispatcher::Prepare(const int32_t streamId)
114 {
115     if (seqNode_.count(streamId) == 0) {
116         return RC_ERROR;
117     }
118 
119     RetCode ret = RC_OK;
120     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
121         CAMERA_LOGV("init node %{public}s begin", (*it)->GetName().c_str());
122         ret = (*it)->Init(streamId) | ret;
123         CAMERA_LOGV("init node %{public}s end", (*it)->GetName().c_str());
124     }
125     return ret;
126 }
127 
Start(const int32_t streamId)128 RetCode StreamPipelineDispatcher::Start(const int32_t streamId)
129 {
130     if (seqNode_.count(streamId) == 0) {
131         return RC_ERROR;
132     }
133 
134     RetCode ret = RC_OK;
135     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
136         CAMERA_LOGV("start node %{public}s begin", (*it)->GetName().c_str());
137         ret = (*it)->Start(streamId) | ret;
138         CAMERA_LOGV("start node %{public}s end", (*it)->GetName().c_str());
139     }
140     return ret;
141 }
142 
Config(const int32_t streamId,const CaptureMeta & meta)143 RetCode StreamPipelineDispatcher::Config(const int32_t streamId, const CaptureMeta& meta)
144 {
145     if (seqNode_.count(streamId) == 0) {
146         return RC_ERROR;
147     }
148 
149     RetCode ret = RC_OK;
150     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
151         ret = (*it)->Config(streamId, meta) | ret;
152     }
153     return ret;
154 }
155 
Flush(const int32_t streamId)156 RetCode StreamPipelineDispatcher::Flush(const int32_t streamId)
157 {
158     if (seqNode_.count(streamId) == 0) {
159         return RC_ERROR;
160     }
161 
162     RetCode ret = RC_OK;
163     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
164         CAMERA_LOGV("flush node %{public}s begin", (*it)->GetName().c_str());
165         ret = (*it)->Flush(streamId) | ret;
166         CAMERA_LOGV("flush node %{public}s end", (*it)->GetName().c_str());
167     }
168     return ret;
169 }
170 
SetCallback(const MetaDataCb cb)171 void StreamPipelineDispatcher::SetCallback(const MetaDataCb cb)
172 {
173     CAMERA_LOGI("StreamPipelineDispatcher line: %{public}d", __LINE__);
174     metaDataCb_ = cb;
175 }
176 
Stop(const int32_t streamId)177 RetCode StreamPipelineDispatcher::Stop(const int32_t streamId)
178 {
179     if (seqNode_.count(streamId) == 0) {
180         return RC_OK;
181     }
182 
183     RetCode ret = RC_OK;
184     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
185         CAMERA_LOGV("stop node %{public}s begin", (*it)->GetName().c_str());
186         ret = (*it)->Stop(streamId) | ret;
187         CAMERA_LOGV("stop node %{public}s end", (*it)->GetName().c_str());
188     }
189     return ret;
190 }
191 
Capture(const int32_t streamId,const int32_t captureId)192 RetCode StreamPipelineDispatcher::Capture(const int32_t streamId, const int32_t captureId)
193 {
194     if (seqNode_.count(streamId) == 0) {
195         return RC_ERROR;
196     }
197 
198     RetCode ret = RC_OK;
199     for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
200         ret = (*it)->Capture(streamId, captureId) | ret;
201     }
202 
203     return ret;
204 }
205 
CancelCapture(const int32_t streamId)206 RetCode StreamPipelineDispatcher::CancelCapture(const int32_t streamId)
207 {
208     if (seqNode_.count(streamId) == 0) {
209         return RC_ERROR;
210     }
211 
212     RetCode ret = RC_OK;
213     for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
214         ret = (*it)->CancelCapture(streamId) | ret;
215     }
216 
217     return ret;
218 }
219 
Destroy(const int32_t streamId)220 RetCode StreamPipelineDispatcher::Destroy(const int32_t streamId)
221 {
222     auto it = seqNode_.find(streamId);
223     if (it == seqNode_.end()) {
224         CAMERA_LOGV("pipeline for stream [id:%{public}d] doesn't exists, no need to destroy.", streamId);
225         return RC_OK;
226     }
227     seqNode_.erase(streamId);
228 
229     return RC_OK;
230 }
231 
GetNode(const int32_t streamId,const std::string name)232 std::shared_ptr<INode> StreamPipelineDispatcher::GetNode(const int32_t streamId, const std::string name)
233 {
234     if (seqNode_.count(streamId) == 0) {
235         return nullptr;
236     }
237 
238     std::shared_ptr<INode> node = nullptr;
239     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
240         if (name == (*it)->GetName().substr(0, 3)) {  // 3:Copy length
241             node = *it;
242         }
243     }
244     return node;
245 }
246 }
247