• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *     http://www.apache.org/licenses/LICENSE-2.0
7  * Unless required by applicable law or agreed to in writing, software
8  * distributed under the License is distributed on an "AS IS" BASIS,
9  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10  * See the License for the specific language governing permissions and
11  * limitations under the License.
12  */
13 
14 #include "stream_pipeline_dispatcher.h"
15 
16 namespace OHOS::Camera {
17 
Create()18 std::unique_ptr<StreamPipelineDispatcher> StreamPipelineDispatcher::Create()
19 {
20     return std::make_unique<StreamPipelineDispatcher>();
21 }
22 
GenerateNodeSeq(std::vector<std::shared_ptr<INode>> & nodeVec,const std::shared_ptr<INode> & node)23 void StreamPipelineDispatcher::GenerateNodeSeq(std::vector<std::shared_ptr<INode>>& nodeVec,
24     const std::shared_ptr<INode>& node)
25 {
26     if (node != nullptr) {
27         nodeVec.push_back(node);
28     } else {
29         return;
30     }
31 
32     if (node->GetNumberOfInPorts() == 0) {
33         return;
34     }
35 
36     for (const auto& it : node->GetInPorts()) {
37         GenerateNodeSeq(nodeVec, it->Peer()->GetNode());
38     }
39 }
40 
Update(const std::shared_ptr<Pipeline> & p)41 RetCode StreamPipelineDispatcher::Update(const std::shared_ptr<Pipeline>& p)
42 {
43     std::vector<std::shared_ptr<INode>> sink;
44     for (auto it = p->nodes_.rbegin(); it < p->nodes_.rend(); it++) {
45         if ((*it)->GetNumberOfInPorts() == 1 && (*it)->GetNumberOfOutPorts() == 0) {
46             sink.push_back(*it);
47         }
48     }
49 
50     std::unordered_map<int, std::vector<std::shared_ptr<INode>>> seqNode;
51     for (const auto& it : sink) {
52         auto inPorts = it->GetInPorts();
53         if (!inPorts.empty()) {
54             // sink node has only one port, and it is a in-port
55             GenerateNodeSeq(seqNode[inPorts[0]->GetStreamId()], it);
56         }
57     }
58 
59     for (auto& [id, pipe] : seqNode) {
60         CutUselessBranch(id, pipe);
61     }
62 
63     std::swap(seqNode_, seqNode);
64     CAMERA_LOGI("------------------------Node Seq(UpStream) Dump Begin-------------\n");
65     for (auto [ss, vv] : seqNode_) {
66         CAMERA_LOGI("sink stream id:%{public}d \n", ss);
67         for (auto it : vv) {
68             CAMERA_LOGI("seq node name:%{public}s\n", it->GetName().c_str());
69         }
70     }
71     CAMERA_LOGI("------------------------Node Seq(UpStream) Dump End-------------\n");
72 #ifndef CAMERA_BUILT_ON_OHOS_LITE
73     SetDispatcherCallback();
74 #endif
75 
76     return RC_OK;
77 }
78 
SetDispatcherCallback()79 RetCode StreamPipelineDispatcher::SetDispatcherCallback()
80 {
81     CAMERA_LOGI("StreamPipelineDispatcher line: %{public}d", __LINE__);
82     RetCode ret = RC_OK;
83     for (auto iter = seqNode_.begin(); iter != seqNode_.end(); iter++) {
84         for (auto it = iter->second.rbegin(); it != iter->second.rend(); it++) {
85             CAMERA_LOGI("SetCallback node %{public}s begin", (*it)->GetName().c_str());
86             ret = (*it)->SetCallback(metaDataCb_) | ret;
87             CAMERA_LOGI("SetCallback node %{public}s end", (*it)->GetName().c_str());
88         }
89     }
90     return ret;
91 }
92 
CutUselessBranch(int32_t streamId,std::vector<std::shared_ptr<INode>> & branch)93 void StreamPipelineDispatcher::CutUselessBranch(int32_t streamId, std::vector<std::shared_ptr<INode>>& branch)
94 {
95     auto it = std::find_if(branch.begin(), branch.end(), [streamId](const std::shared_ptr<INode> node) {
96         auto ports = node->GetOutPorts();
97         bool isSameBranch = true;
98         for (auto port : ports) {
99             if (port->GetStreamId() == streamId) {
100                 return false;
101             }
102             isSameBranch = false;
103         }
104         return !isSameBranch;
105     });
106     if (it == branch.end()) {
107         return;
108     }
109     branch.erase(it, branch.end());
110 
111     return;
112 }
113 
Prepare(const int32_t streamId)114 RetCode StreamPipelineDispatcher::Prepare(const int32_t streamId)
115 {
116     if (seqNode_.count(streamId) == 0) {
117         return RC_ERROR;
118     }
119 
120     RetCode ret = RC_OK;
121     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
122         CAMERA_LOGV("init node %{public}s begin", (*it)->GetName().c_str());
123         ret = (*it)->Init(streamId) | ret;
124         CAMERA_LOGV("init node %{public}s end", (*it)->GetName().c_str());
125     }
126     return ret;
127 }
128 
Start(const int32_t streamId)129 RetCode StreamPipelineDispatcher::Start(const int32_t streamId)
130 {
131     if (seqNode_.count(streamId) == 0) {
132         return RC_ERROR;
133     }
134 
135     RetCode ret = RC_OK;
136     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
137         CAMERA_LOGV("start node %{public}s begin", (*it)->GetName().c_str());
138         ret = (*it)->Start(streamId) | ret;
139         CAMERA_LOGV("start node %{public}s end", (*it)->GetName().c_str());
140     }
141     return ret;
142 }
143 
Config(const int32_t streamId,const CaptureMeta & meta)144 RetCode StreamPipelineDispatcher::Config(const int32_t streamId, const CaptureMeta& meta)
145 {
146     if (seqNode_.count(streamId) == 0) {
147         return RC_ERROR;
148     }
149 
150     RetCode ret = RC_OK;
151     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
152         ret = (*it)->Config(streamId, meta) | ret;
153     }
154     return ret;
155 }
156 
UpdateSettingsConfig(const CaptureMeta & meta)157 RetCode StreamPipelineDispatcher::UpdateSettingsConfig(const CaptureMeta& meta)
158 {
159     RetCode ret = RC_OK;
160     for (auto iter = seqNode_.begin(); iter != seqNode_.end(); iter++) {
161         for (auto it = iter->second.rbegin(); it != iter->second.rend(); it++) {
162             CAMERA_LOGV("UpdateSettingsConfig node %{public}s begin", (*it)->GetName().c_str());
163             ret = (*it)->UpdateSettingsConfig(meta) | ret;
164             CAMERA_LOGV("UpdateSettingsConfig node %{public}s end", (*it)->GetName().c_str());
165         }
166     }
167     return ret;
168 }
169 
Flush(const int32_t streamId)170 RetCode StreamPipelineDispatcher::Flush(const int32_t streamId)
171 {
172     if (seqNode_.count(streamId) == 0) {
173         return RC_ERROR;
174     }
175 
176     RetCode ret = RC_OK;
177     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
178         CAMERA_LOGV("flush node %{public}s begin", (*it)->GetName().c_str());
179         ret = (*it)->Flush(streamId) | ret;
180         CAMERA_LOGV("flush node %{public}s end", (*it)->GetName().c_str());
181     }
182     return ret;
183 }
184 
SetCallback(const MetaDataCb cb)185 void StreamPipelineDispatcher::SetCallback(const MetaDataCb cb)
186 {
187     CAMERA_LOGI("StreamPipelineDispatcher line: %{public}d", __LINE__);
188     metaDataCb_ = cb;
189 }
190 
Stop(const int32_t streamId)191 RetCode StreamPipelineDispatcher::Stop(const int32_t streamId)
192 {
193     if (seqNode_.count(streamId) == 0) {
194         return RC_OK;
195     }
196 
197     RetCode ret = RC_OK;
198     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
199         CAMERA_LOGV("stop node %{public}s begin", (*it)->GetName().c_str());
200         ret = (*it)->Stop(streamId) | ret;
201         CAMERA_LOGV("stop node %{public}s end", (*it)->GetName().c_str());
202     }
203     return ret;
204 }
205 
Capture(const int32_t streamId,const int32_t captureId)206 RetCode StreamPipelineDispatcher::Capture(const int32_t streamId, const int32_t captureId)
207 {
208     if (seqNode_.count(streamId) == 0) {
209         return RC_ERROR;
210     }
211 
212     RetCode ret = RC_OK;
213     for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
214         ret = (*it)->Capture(streamId, captureId) | ret;
215     }
216 
217     return ret;
218 }
219 
CancelCapture(const int32_t streamId)220 RetCode StreamPipelineDispatcher::CancelCapture(const int32_t streamId)
221 {
222     if (seqNode_.count(streamId) == 0) {
223         return RC_ERROR;
224     }
225 
226     RetCode ret = RC_OK;
227     for (auto it = seqNode_[streamId].begin(); it != seqNode_[streamId].end(); it++) {
228         ret = (*it)->CancelCapture(streamId) | ret;
229     }
230 
231     return ret;
232 }
233 
Destroy(const int32_t streamId)234 RetCode StreamPipelineDispatcher::Destroy(const int32_t streamId)
235 {
236     auto it = seqNode_.find(streamId);
237     if (it == seqNode_.end()) {
238         CAMERA_LOGV("pipeline for stream [id:%{public}d] doesn't exists, no need to destroy.", streamId);
239         return RC_OK;
240     }
241     seqNode_.erase(streamId);
242 
243     return RC_OK;
244 }
245 
GetNode(const int32_t streamId,const std::string name)246 std::shared_ptr<INode> StreamPipelineDispatcher::GetNode(const int32_t streamId, const std::string name)
247 {
248     if (seqNode_.count(streamId) == 0) {
249         return nullptr;
250     }
251 
252     std::shared_ptr<INode> node = nullptr;
253     for (auto it = seqNode_[streamId].rbegin(); it != seqNode_[streamId].rend(); it++) {
254         if (name == (*it)->GetName().substr(0, 3)) {  // 3:Copy length
255             node = *it;
256         }
257     }
258     return node;
259 }
260 }
261