1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "stream_base.h"
17 #include "buffer_adapter.h"
18 #include "buffer_manager.h"
19 #include "watchdog.h"
20
21 namespace OHOS::Camera {
22 std::map<StreamIntent, std::string> IStream::g_avaliableStreamType = {
23 {PREVIEW, STREAM_INTENT_TO_STRING(PREVIEW)},
24 {VIDEO, STREAM_INTENT_TO_STRING(VIDEO)},
25 {STILL_CAPTURE, STREAM_INTENT_TO_STRING(STILL_CAPTURE)},
26 {POST_VIEW, STREAM_INTENT_TO_STRING(POST_VIEW)},
27 {ANALYZE, STREAM_INTENT_TO_STRING(ANALYZE)},
28 {CUSTOM, STREAM_INTENT_TO_STRING(CUSTOM)},
29 };
30
StreamBase(const int32_t id,const StreamIntent type,std::shared_ptr<IPipelineCore> & p,std::shared_ptr<CaptureMessageOperator> & m)31 StreamBase::StreamBase(const int32_t id,
32 const StreamIntent type,
33 std::shared_ptr<IPipelineCore>& p,
34 std::shared_ptr<CaptureMessageOperator>& m)
35 {
36 streamId_ = id;
37 streamType_ = static_cast<int32_t>(type);
38 pipelineCore_ = p;
39 messenger_ = m;
40 }
41
~StreamBase()42 StreamBase::~StreamBase()
43 {
44 if (state_ == STREAM_STATE_BUSY) {
45 StopStream();
46 }
47
48 if (hostStreamMgr_ != nullptr) {
49 hostStreamMgr_->DestroyHostStream({streamId_});
50 }
51
52 if (pipeline_ != nullptr) {
53 pipeline_->DestroyPipeline({streamId_});
54 }
55 }
56
ConfigStream(StreamConfiguration & config)57 RetCode StreamBase::ConfigStream(StreamConfiguration& config)
58 {
59 if (state_ != STREAM_STATE_IDLE) {
60 return RC_ERROR;
61 }
62
63 streamConfig_ = config;
64 streamConfig_.usage = GetUsage();
65 if (tunnel_ != nullptr) {
66 streamConfig_.tunnelMode = true;
67 }
68 streamConfig_.bufferCount = GetBufferCount();
69 streamConfig_.maxBatchCaptureCount = 1;
70 streamConfig_.maxCaptureCount = 1;
71 // get device cappability to overide configuration
72 return RC_OK;
73 }
74
CommitStream()75 RetCode StreamBase::CommitStream()
76 {
77 if (state_ != STREAM_STATE_IDLE) {
78 return RC_ERROR;
79 }
80
81 CHECK_IF_PTR_NULL_RETURN_VALUE(pipelineCore_, RC_ERROR);
82
83 pipeline_ = pipelineCore_->GetStreamPipelineCore();
84 CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
85
86 hostStreamMgr_ = pipelineCore_->GetHostStreamMgr();
87 CHECK_IF_PTR_NULL_RETURN_VALUE(hostStreamMgr_, RC_ERROR);
88
89 HostStreamInfo info;
90 info.type_ = static_cast<StreamIntent>(streamType_);
91 info.streamId_ = streamId_;
92 info.width_ = streamConfig_.width;
93 info.height_ = streamConfig_.height;
94 info.format_ = streamConfig_.format;
95 info.usage_ = streamConfig_.usage;
96 info.encodeType_ = streamConfig_.encodeType;
97
98 if (streamConfig_.tunnelMode) {
99 BufferManager* mgr = BufferManager::GetInstance();
100 CHECK_IF_PTR_NULL_RETURN_VALUE(mgr, RC_ERROR);
101
102 if (bufferPool_ == nullptr) {
103 poolId_ = mgr->GenerateBufferPoolId();
104 CHECK_IF_EQUAL_RETURN_VALUE(poolId_, 0, RC_ERROR);
105
106 bufferPool_ = mgr->GetBufferPool(poolId_);
107 if (bufferPool_ == nullptr) {
108 CAMERA_LOGE("stream [id:%{public}d] get buffer pool failed.", streamId_);
109 return RC_ERROR;
110 }
111 }
112
113 info.bufferPoolId_ = poolId_;
114 info.bufferCount_ = GetBufferCount();
115 RetCode rc = bufferPool_->Init(streamConfig_.width, streamConfig_.height, streamConfig_.usage,
116 streamConfig_.format, GetBufferCount(), CAMERA_BUFFER_SOURCE_TYPE_EXTERNAL);
117 if (rc != RC_OK) {
118 CAMERA_LOGE("stream [id:%{public}d] initialize buffer pool failed.", streamId_);
119 return RC_ERROR;
120 }
121 }
122
123 RetCode rc = hostStreamMgr_->CreateHostStream(info, [this](std::shared_ptr<IBuffer> buffer) {
124 HandleResult(buffer);
125 return;
126 });
127 if (rc != RC_OK) {
128 CAMERA_LOGE("commit stream [id:%{public}d] to pipeline failed.", streamId_);
129 return RC_ERROR;
130 }
131 CAMERA_LOGI("commit a stream to pipeline id[%{public}d], w[%{public}d], h[%{public}d], poolId[%{public}llu], \
132 encodeType = %{public}d", info.streamId_, info.width_, info.height_, info.bufferPoolId_, info.encodeType_);
133 state_ = STREAM_STATE_ACTIVE;
134
135 return RC_OK;
136 }
137
StartStream()138 RetCode StreamBase::StartStream()
139 {
140 CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
141
142 if (state_ != STREAM_STATE_ACTIVE) {
143 return RC_ERROR;
144 }
145
146 CAMERA_LOGI("start stream [id:%{public}d] begin", streamId_);
147 tunnel_->NotifyStart();
148
149 RetCode rc = pipeline_->Prepare({streamId_});
150 if (rc != RC_OK) {
151 CAMERA_LOGE("pipeline [id:%{public}d] prepare failed", streamId_);
152 return rc;
153 }
154
155 state_ = STREAM_STATE_BUSY;
156 std::string threadName =
157 g_avaliableStreamType[static_cast<StreamIntent>(streamType_)] + "#" + std::to_string(streamId_);
158 handler_ = std::make_unique<std::thread>([this, &threadName] {
159 prctl(PR_SET_NAME, threadName.c_str());
160 while (state_ == STREAM_STATE_BUSY) {
161 HandleRequest();
162 }
163 });
164 if (handler_ == nullptr) {
165 state_ = STREAM_STATE_ACTIVE;
166 return RC_ERROR;
167 }
168
169 rc = pipeline_->Start({streamId_});
170 if (rc != RC_OK) {
171 CAMERA_LOGE("pipeline [%{public}d] start failed", streamId_);
172 return RC_ERROR;
173 }
174 CAMERA_LOGI("start stream [id:%{public}d] end", streamId_);
175
176 return RC_OK;
177 }
178
StopStream()179 RetCode StreamBase::StopStream()
180 {
181 CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
182 if (state_ == STREAM_STATE_IDLE) {
183 CAMERA_LOGI("stream [id:%{public}d], no need to stop", streamId_);
184 return RC_OK;
185 }
186 CAMERA_LOGI("stop stream [id:%{public}d] begin", streamId_);
187
188 state_ = STREAM_STATE_IDLE;
189
190 tunnel_->NotifyStop();
191 cv_.notify_one();
192 if (handler_ != nullptr) {
193 handler_->join();
194 }
195
196 if (!waitingList_.empty()) {
197 auto request = waitingList_.front();
198 if (request != nullptr && request->IsContinous()) {
199 request->Cancel();
200 }
201 }
202 {
203 std::unique_lock<std::mutex> l(wtLock_);
204 waitingList_.clear();
205 }
206
207 RetCode rc = pipeline_->Flush({streamId_});
208 if (rc != RC_OK) {
209 CAMERA_LOGE("stream [id:%{public}d], pipeline flush failed", streamId_);
210 return RC_ERROR;
211 }
212
213 CAMERA_LOGI("stream [id:%{public}d] is waiting buffers returned", streamId_);
214 tunnel_->WaitForAllBufferReturned();
215 CAMERA_LOGI("stream [id:%{public}d], all buffers are returned.", streamId_);
216
217 rc = pipeline_->Stop({streamId_});
218 if (rc != RC_OK) {
219 CAMERA_LOGE("stream [id:%{public}d], pipeline stop failed", streamId_);
220 return RC_ERROR;
221 }
222
223 if (lastRequest_->IsContinous() && !inTransitList_.empty()) {
224 std::shared_ptr<ICaptureMessage> endMessage =
225 std::make_shared<CaptureEndedMessage>(streamId_, lastRequest_->GetCaptureId(),
226 lastRequest_->GetEndTime(), lastRequest_->GetOwnerCount(), tunnel_->GetFrameCount());
227 CAMERA_LOGV("end of stream [%{public}d], ready to send end message", streamId_);
228 messenger_->SendMessage(endMessage);
229 }
230
231 CAMERA_LOGI("stop stream [id:%{public}d] end", streamId_);
232 isFirstRequest = true;
233
234 inTransitList_.clear();
235 tunnel_->CleanBuffers();
236 bufferPool_->ClearBuffers();
237 return RC_OK;
238 }
239
AddRequest(std::shared_ptr<CaptureRequest> & request)240 RetCode StreamBase::AddRequest(std::shared_ptr<CaptureRequest>& request)
241 {
242 CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
243 request->AddOwner(shared_from_this());
244
245 request->SetFirstRequest(false);
246 if (isFirstRequest) {
247 uint32_t n = GetBufferCount();
248 if (!request->IsContinous()) {
249 for (uint32_t i = 0; i < n; i++) {
250 DeliverBuffer();
251 }
252 }
253
254 RetCode rc = StartStream();
255 if (rc != RC_OK) {
256 CAMERA_LOGE("start stream [id:%{public}d] failed", streamId_);
257 return RC_ERROR;
258 }
259 request->SetFirstRequest(true);
260 isFirstRequest = false;
261 }
262
263 {
264 std::unique_lock<std::mutex> l(wtLock_);
265 waitingList_.emplace_back(request);
266 cv_.notify_one();
267 }
268
269 return RC_OK;
270 }
271
CancelRequest(const std::shared_ptr<CaptureRequest> & request)272 RetCode StreamBase::CancelRequest(const std::shared_ptr<CaptureRequest>& request)
273 {
274 CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
275
276 {
277 // We don't care if this request is continious-capture or single-capture, just erase it.
278 // And those requests in inTransitList_ will be removed in HandleResult.
279 std::unique_lock<std::mutex> wl(wtLock_);
280 auto it = std::find(waitingList_.begin(), waitingList_.end(), request);
281 if (it != waitingList_.end()) {
282 waitingList_.erase(it);
283 CAMERA_LOGI("stream [id:%{public}d], cancel request(capture id:%{public}d) success",
284 streamId_, request->GetCaptureId());
285 }
286 }
287
288 if (request->IsContinous()) {
289 // may be this is the last request
290 std::unique_lock<std::mutex> tl(tsLock_);
291 auto it = std::find(inTransitList_.begin(), inTransitList_.end(), request);
292 if (it == inTransitList_.end()) {
293 std::shared_ptr<ICaptureMessage> endMessage =
294 std::make_shared<CaptureEndedMessage>(streamId_, request->GetCaptureId(), request->GetEndTime(),
295 request->GetOwnerCount(), tunnel_->GetFrameCount());
296 CAMERA_LOGV("end of stream [%{public}d], ready to send end message", streamId_);
297 messenger_->SendMessage(endMessage);
298 pipeline_->CancelCapture({streamId_});
299 }
300 }
301 return RC_OK;
302 }
303
HandleRequest()304 void StreamBase::HandleRequest()
305 {
306 if (waitingList_.empty()) {
307 std::unique_lock<std::mutex> l(wtLock_);
308 if (waitingList_.empty()) {
309 cv_.wait(l, [this] { return !(state_ == STREAM_STATE_BUSY && waitingList_.empty()); });
310 }
311 }
312 if (state_ != STREAM_STATE_BUSY) {
313 return;
314 }
315
316 std::shared_ptr<CaptureRequest> request = nullptr;
317 {
318 // keep a copy of continious-capture in waitingList_, unless it's going to be canceled.
319 std::unique_lock<std::mutex> l(wtLock_);
320 if (waitingList_.empty()) {
321 return;
322 }
323 request = waitingList_.front();
324 CHECK_IF_PTR_NULL_RETURN_VOID(request);
325 if (!request->IsContinous()) {
326 waitingList_.pop_front();
327 }
328 }
329 if (request == nullptr) {
330 CAMERA_LOGE("fatal error, stream [%{public}d] request list is not empty, but can't get one", streamId_);
331 return;
332 }
333
334 {
335 std::unique_lock<std::mutex> l(tsLock_);
336 if (request->NeedCancel()) {
337 return;
338 }
339 inTransitList_.emplace_back(request);
340 }
341 request->Process(streamId_);
342
343 return;
344 }
345
Capture(const std::shared_ptr<CaptureRequest> & request)346 RetCode StreamBase::Capture(const std::shared_ptr<CaptureRequest>& request)
347 {
348 CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
349 CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
350
351 RetCode rc = RC_ERROR;
352
353 rc = pipeline_->Config({streamId_}, request->GetCaptureSetting());
354 if (rc != RC_OK) {
355 CAMERA_LOGE("stream [id:%{public}d] config pipeline failed.", streamId_);
356 return RC_ERROR;
357 }
358
359 rc = pipeline_->Capture({streamId_}, request->GetCaptureId());
360 if (rc != RC_OK) {
361 CAMERA_LOGE("stream [id:%{public}d] take a capture failed.", streamId_);
362 return RC_ERROR;
363 }
364
365 if (request->IsFirstOne()) {
366 if (messenger_ == nullptr) {
367 CAMERA_LOGE("stream [id:%{public}d] can't send message, messenger_ is null", streamId_);
368 return RC_ERROR;
369 }
370 std::shared_ptr<ICaptureMessage> startMessage = std::make_shared<CaptureStartedMessage>(
371 streamId_, request->GetCaptureId(), request->GetBeginTime(), request->GetOwnerCount());
372 messenger_->SendMessage(startMessage);
373 request->SetFirstRequest(false);
374 }
375
376 // DeliverBuffer must be called after Capture, or this capture request will miss a buffer.
377 do {
378 rc = DeliverBuffer();
379 } while (rc != RC_OK && state_ == STREAM_STATE_BUSY);
380
381 return RC_OK;
382 }
383
DeliverBuffer()384 RetCode StreamBase::DeliverBuffer()
385 {
386 CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, RC_ERROR);
387 CHECK_IF_PTR_NULL_RETURN_VALUE(bufferPool_, RC_ERROR);
388
389 std::shared_ptr<IBuffer> buffer = tunnel_->GetBuffer();
390 CHECK_IF_PTR_NULL_RETURN_VALUE(buffer, RC_ERROR);
391
392 buffer->SetEncodeType(streamConfig_.encodeType);
393 buffer->SetStreamId(streamId_);
394 bufferPool_->AddBuffer(buffer);
395 CAMERA_LOGI("stream [id:%{public}d] enqueue buffer index:%{public}d", streamId_, buffer->GetIndex());
396 return RC_OK;
397 }
398
HandleResult(std::shared_ptr<IBuffer> & buffer)399 void StreamBase::HandleResult(std::shared_ptr<IBuffer>& buffer)
400 {
401 CHECK_IF_PTR_NULL_RETURN_VOID(buffer);
402 if (buffer->GetBufferStatus() == CAMERA_BUFFER_STATUS_INVALID) {
403 CAMERA_LOGI("stream [id:%{public}d], this buffer(index:%{public}d) has nothing to do with request.", streamId_,
404 buffer->GetIndex());
405 ReceiveBuffer(buffer);
406 return;
407 }
408
409 if (buffer->GetStreamId() != streamId_) {
410 CAMERA_LOGE("fatal error, stream [%{public}d] reveived a wrong buffer, index:%{public}d. \
411 this buffer belongs to stream:%{public}d", streamId_, buffer->GetIndex(), buffer->GetStreamId());
412 return;
413 }
414
415 int32_t captureId = buffer->GetCaptureId();
416 std::shared_ptr<CaptureRequest> request = nullptr;
417 {
418 std::unique_lock<std::mutex> l(tsLock_);
419 for (auto& r : inTransitList_) {
420 if (r == nullptr) {
421 continue;
422 }
423 if (r->GetCaptureId() == captureId) {
424 request = r;
425 }
426 }
427 }
428 if (request == nullptr) {
429 CAMERA_LOGI("stream [id:%{public}d], this buffer(index:%{public}d) has nothing to do with request.",
430 streamId_, buffer->GetIndex());
431 buffer->SetBufferStatus(CAMERA_BUFFER_STATUS_INVALID);
432 ReceiveBuffer(buffer);
433 return;
434 }
435 request->AttachBuffer(buffer);
436 // To synchronize multiple stream, bottom-layer device stream need be synchronized first.
437 request->OnResult(streamId_);
438 lastRequest_ = request;
439
440 return;
441 }
442
OnFrame(const std::shared_ptr<CaptureRequest> & request)443 RetCode StreamBase::OnFrame(const std::shared_ptr<CaptureRequest>& request)
444 {
445 CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
446 CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
447 auto buffer = request->GetAttachedBuffer();
448 CameraBufferStatus status = buffer->GetBufferStatus();
449 if (status != CAMERA_BUFFER_STATUS_OK) {
450 if (status != CAMERA_BUFFER_STATUS_DROP) {
451 std::shared_ptr<ICaptureMessage> errorMessage =
452 std::make_shared<CaptureErrorMessage>(streamId_, request->GetCaptureId(), request->GetEndTime(),
453 request->GetOwnerCount(), static_cast<StreamError>(status));
454 messenger_->SendMessage(errorMessage);
455 }
456 }
457 if (request->NeedShutterCallback() && messenger_ != nullptr) {
458 std::shared_ptr<ICaptureMessage> shutterMessage = std::make_shared<FrameShutterMessage>(
459 streamId_, request->GetCaptureId(), request->GetEndTime(), request->GetOwnerCount());
460 messenger_->SendMessage(shutterMessage);
461 }
462
463 bool isEnded = false;
464 if (!request->IsContinous()) {
465 isEnded = true;
466 } else if (request->NeedCancel()) {
467 isEnded = true;
468 }
469
470 {
471 // inTransitList_ may have multiple copies of continious-capture request, we just need erase one of them.
472 std::unique_lock<std::mutex> l(tsLock_);
473 for (auto it = inTransitList_.begin(); it != inTransitList_.end(); it++) {
474 if ((*it) == request) {
475 inTransitList_.erase(it);
476 break;
477 }
478 }
479
480 if (isEnded) {
481 // if this is the last request of capture, send CaptureEndedMessage.
482 auto it = std::find(inTransitList_.begin(), inTransitList_.end(), request);
483 if (it == inTransitList_.end()) {
484 std::shared_ptr<ICaptureMessage> endMessage =
485 std::make_shared<CaptureEndedMessage>(streamId_, request->GetCaptureId(), request->GetEndTime(),
486 request->GetOwnerCount(), tunnel_->GetFrameCount());
487 CAMERA_LOGV("end of stream [%d], ready to send end message, capture id = %d",
488 streamId_, request->GetCaptureId());
489 messenger_->SendMessage(endMessage);
490 pipeline_->CancelCapture({streamId_});
491 }
492 }
493 }
494
495 ReceiveBuffer(buffer);
496 return RC_OK;
497 }
498
ReceiveBuffer(std::shared_ptr<IBuffer> & buffer)499 RetCode StreamBase::ReceiveBuffer(std::shared_ptr<IBuffer>& buffer)
500 {
501 CHECK_IF_PTR_NULL_RETURN_VALUE(buffer, RC_ERROR);
502 CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, RC_ERROR);
503 CHECK_IF_PTR_NULL_RETURN_VALUE(bufferPool_, RC_ERROR);
504
505 CAMERA_LOGI("stream [id:%{public}d] dequeue buffer index:%{public}d, status:%{public}d",
506 streamId_, buffer->GetIndex(), buffer->GetBufferStatus());
507 bufferPool_->ReturnBuffer(buffer);
508 tunnel_->PutBuffer(buffer);
509 return RC_OK;
510 }
511
GetFrameCount() const512 uint64_t StreamBase::GetFrameCount() const
513 {
514 CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, 0);
515 return tunnel_->GetFrameCount();
516 }
517
AttachStreamTunnel(std::shared_ptr<StreamTunnel> & tunnel)518 RetCode StreamBase::AttachStreamTunnel(std::shared_ptr<StreamTunnel>& tunnel)
519 {
520 if (state_ == STREAM_STATE_BUSY || state_ == STREAM_STATE_OFFLINE) {
521 return RC_ERROR;
522 }
523
524 tunnel_ = tunnel;
525 CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, RC_ERROR);
526 tunnel_->SetBufferCount(GetBufferCount());
527 TunnelConfig config = {streamConfig_.width, streamConfig_.height, streamConfig_.format, streamConfig_.usage};
528 tunnel_->Config(config);
529
530 streamConfig_.tunnelMode = true;
531 return RC_OK;
532 }
533
DetachStreamTunnel()534 RetCode StreamBase::DetachStreamTunnel()
535 {
536 if (state_ == STREAM_STATE_BUSY || state_ == STREAM_STATE_OFFLINE) {
537 return RC_ERROR;
538 }
539
540 tunnel_.reset();
541 streamConfig_.tunnelMode = false;
542
543 state_ = STREAM_STATE_IDLE;
544 return RC_OK;
545 }
546
ChangeToOfflineStream(std::shared_ptr<OfflineStream> offlineStream)547 RetCode StreamBase::ChangeToOfflineStream(std::shared_ptr<OfflineStream> offlineStream)
548 {
549 return RC_OK;
550 }
551
GetUsage()552 uint64_t StreamBase::GetUsage()
553 {
554 return CAMERA_USAGE_SW_WRITE_OFTEN | CAMERA_USAGE_SW_READ_OFTEN | CAMERA_USAGE_MEM_DMA;
555 }
556
GetBufferCount()557 uint32_t StreamBase::GetBufferCount()
558 {
559 return 3; // 3: buffer count
560 }
561
GetStreamAttribute() const562 StreamConfiguration StreamBase::GetStreamAttribute() const
563 {
564 return streamConfig_;
565 }
566
GetStreamId() const567 int32_t StreamBase::GetStreamId() const
568 {
569 return streamId_;
570 }
571
IsRunning() const572 bool StreamBase::IsRunning() const
573 {
574 return state_ == STREAM_STATE_BUSY;
575 }
576
GetTunnelMode() const577 bool StreamBase::GetTunnelMode() const
578 {
579 return streamConfig_.tunnelMode;
580 }
581 } // namespace OHOS::Camera
582