1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "offline_pipeline.h"
17 #include "buffer_manager.h"
18 #include "ibuffer_pool.h"
19 #include <vector>
20
21 namespace OHOS::Camera {
OfflinePipeline()22 OfflinePipeline::OfflinePipeline() {}
23
~OfflinePipeline()24 OfflinePipeline::~OfflinePipeline()
25 {
26 StopProcess();
27 bufferCache_.clear();
28 }
29
StartProcess()30 RetCode OfflinePipeline::StartProcess()
31 {
32 running_ = true;
33 processThread_ = new std::thread([this]() {
34 prctl(PR_SET_NAME, "offlinepipeline");
35 while (running_) {
36 HandleBuffers();
37 }
38 });
39 if (processThread_ == nullptr) {
40 return RC_ERROR;
41 }
42 return RC_OK;
43 }
44
StopProcess()45 RetCode OfflinePipeline::StopProcess()
46 {
47 if (processThread_ == nullptr) {
48 CAMERA_LOGE("cannot stop.");
49 return RC_ERROR;
50 }
51
52 if (running_.load() == false) {
53 return RC_OK;
54 }
55
56 running_ = false;
57 cv_.notify_one();
58 processThread_->join();
59 delete processThread_;
60 processThread_ = nullptr;
61 return RC_OK;
62 }
63
BindOfflineStreamCallback(std::function<void (std::shared_ptr<IBuffer> &)> & callback)64 void OfflinePipeline::BindOfflineStreamCallback(std::function<void(std::shared_ptr<IBuffer>&)>& callback)
65 {
66 std::lock_guard<std::mutex> l(cbLock_);
67 callback_ = callback;
68
69 return;
70 }
71
SwitchToOfflineMode()72 void OfflinePipeline::SwitchToOfflineMode()
73 {
74 offlineMode_ = true;
75 }
76
CancelCapture(int32_t captureId)77 RetCode OfflinePipeline::CancelCapture(int32_t captureId)
78 {
79 CAMERA_LOGE("cancel capture begin");
80 if (bufferCache_.empty()) {
81 CAMERA_LOGE("cancel capture failed, capture id = %{public}d doesn't exist", captureId);
82 return RC_OK;
83 }
84
85 std::vector<std::shared_ptr<IBuffer>> cache;
86 {
87 std::unique_lock<std::mutex> l(queueLock_);
88 auto it = std::find_if(bufferCache_.begin(), bufferCache_.end(),
89 [&captureId](const std::vector<std::shared_ptr<IBuffer>>& c) {
90 for (auto b : c) {
91 if (b->GetCaptureId() == captureId) {
92 return true;
93 }
94 }
95 return false;
96 });
97 if (it == bufferCache_.end()) {
98 CAMERA_LOGE("cancel capture failed, capture id = %{public}d doesn't exist", captureId);
99 return RC_OK;
100 }
101 cache = *it;
102 bufferCache_.erase(it);
103 }
104 for (auto it : cache) {
105 it->SetBufferStatus(CAMERA_BUFFER_STATUS_DROP);
106 }
107 DeliverCancelCache(cache);
108 CAMERA_LOGE("cancel capture end");
109 return RC_OK;
110 }
111
FlushOfflineStream()112 RetCode OfflinePipeline::FlushOfflineStream()
113 {
114 if (!offlineMode_.load()) {
115 CAMERA_LOGE("can't flush in online mode");
116 return RC_ERROR;
117 }
118
119 if (!bufferCache_.empty()) {
120 std::unique_lock<std::mutex> l(queueLock_);
121 while (!bufferCache_.empty()) {
122 auto cache = bufferCache_.front();
123 bufferCache_.pop_front();
124
125 for (auto it : cache) {
126 it->SetBufferStatus(CAMERA_BUFFER_STATUS_DROP);
127 }
128 DeliverCancelCache(cache);
129 }
130 }
131
132 return RC_OK;
133 }
134
ReceiveCache(std::vector<std::shared_ptr<IBuffer>> & buffers)135 void OfflinePipeline::ReceiveCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
136 {
137 if (!buffers.empty() && buffers[0]->GetBufferStatus() != CAMERA_BUFFER_STATUS_OK) {
138 DeliverCancelCache(buffers);
139 return;
140 }
141
142 std::unique_lock<std::mutex> l(queueLock_);
143 bufferCache_.emplace_back(buffers);
144 cv_.notify_one();
145
146 return;
147 }
148
HandleBuffers()149 void OfflinePipeline::HandleBuffers()
150 {
151 if (bufferCache_.empty()) {
152 std::unique_lock<std::mutex> l(queueLock_);
153 if (bufferCache_.empty()) {
154 cv_.wait(l, [this] { return !(running_.load() && bufferCache_.empty()); });
155 }
156 }
157
158 if (running_ == false) {
159 return;
160 }
161
162 std::vector<std::shared_ptr<IBuffer>> cache = {};
163 if (!bufferCache_.empty()) {
164 std::unique_lock<std::mutex> l(queueLock_);
165 if (!bufferCache_.empty()) {
166 cache = bufferCache_.front();
167 bufferCache_.pop_front();
168 }
169 }
170
171 if (cache.empty()) {
172 return;
173 }
174 ProcessCache(cache);
175
176 return;
177 }
178
ProcessCache(std::vector<std::shared_ptr<IBuffer>> & buffers)179 void OfflinePipeline::ProcessCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
180 {
181 DeliverCache(buffers);
182 return;
183 }
DeliverCacheCheck(std::vector<std::shared_ptr<IBuffer>> & buffers)184 void OfflinePipeline::DeliverCacheCheck(std::vector<std::shared_ptr<IBuffer>>& buffers)
185 {
186 for (auto it : buffers) {
187 if (it == nullptr) {
188 continue;
189 }
190 auto bufferManager = BufferManager::GetInstance();
191 if (bufferManager == nullptr) {
192 CAMERA_LOGE("can't get buffer manager");
193 continue;
194 }
195 auto bufferPool = bufferManager->GetBufferPool(it->GetPoolId());
196 if (bufferPool == nullptr) {
197 CAMERA_LOGE("can't get buffer pool");
198 return;
199 }
200 bufferPool->ReturnBuffer(it);
201 }
202 }
DeliverCache(std::vector<std::shared_ptr<IBuffer>> & buffers)203 void OfflinePipeline::DeliverCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
204 {
205 DeliverCacheCheck(buffers);
206 if (offlineMode_.load()) {
207 std::shared_ptr<IBuffer> nullBuffer = nullptr;
208 DeliverOfflineBuffer(nullBuffer);
209 }
210 return;
211 }
212
DeliverCancelCache(std::vector<std::shared_ptr<IBuffer>> & buffers)213 void OfflinePipeline::DeliverCancelCache(std::vector<std::shared_ptr<IBuffer>>& buffers)
214 {
215 DeliverCache(buffers);
216 return;
217 }
218
DeliverOfflineBuffer(std::shared_ptr<IBuffer> & buffer)219 void OfflinePipeline::DeliverOfflineBuffer(std::shared_ptr<IBuffer>& buffer)
220 {
221 if (!offlineMode_.load()) {
222 CAMERA_LOGE("cannot deliver buffer in online mode");
223 return;
224 }
225
226 if (callback_ == nullptr) {
227 CAMERA_LOGE("cannot deliver offline buffer, callback_ is null");
228 return;
229 }
230
231 callback_(buffer);
232
233 return;
234 }
235
CacheQueueDry()236 bool OfflinePipeline::CacheQueueDry()
237 {
238 std::unique_lock<std::mutex> l(queueLock_);
239 return bufferCache_.empty();
240 }
241
CheckOwnerOfCaptureId(int32_t captureId)242 bool OfflinePipeline::CheckOwnerOfCaptureId(int32_t captureId)
243 {
244 std::unique_lock<std::mutex> l(queueLock_);
245 for (auto it : bufferCache_) {
246 for (auto buffer : it) {
247 if (captureId == buffer->GetCaptureId()) {
248 return true;
249 }
250 }
251 }
252 return false;
253 }
254 } // namespace OHOS::Camera
255