1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "stream_base.h"
17 #include "buffer_adapter.h"
18 #include "buffer_manager.h"
19 #include "watchdog.h"
20
21 namespace OHOS::Camera {
22 std::map<StreamIntent, std::string> IStream::g_availableStreamType = {
23 {PREVIEW, STREAM_INTENT_TO_STRING(PREVIEW)},
24 {VIDEO, STREAM_INTENT_TO_STRING(VIDEO)},
25 {STILL_CAPTURE, STREAM_INTENT_TO_STRING(STILL_CAPTURE)},
26 {POST_VIEW, STREAM_INTENT_TO_STRING(POST_VIEW)},
27 {ANALYZE, STREAM_INTENT_TO_STRING(ANALYZE)},
28 {CUSTOM, STREAM_INTENT_TO_STRING(CUSTOM)},
29 };
30
StreamBase(const int32_t id,const StreamIntent type,std::shared_ptr<IPipelineCore> & p,std::shared_ptr<CaptureMessageOperator> & m)31 StreamBase::StreamBase(const int32_t id,
32 const StreamIntent type,
33 std::shared_ptr<IPipelineCore>& p,
34 std::shared_ptr<CaptureMessageOperator>& m)
35 {
36 streamId_ = id;
37 streamType_ = static_cast<int32_t>(type);
38 pipelineCore_ = p;
39 messenger_ = m;
40 }
41
~StreamBase()42 StreamBase::~StreamBase()
43 {
44 if (state_ == STREAM_STATE_BUSY) {
45 StopStream();
46 }
47
48 if (hostStreamMgr_ != nullptr) {
49 hostStreamMgr_->DestroyHostStream({streamId_});
50 }
51
52 if (pipeline_ != nullptr) {
53 pipeline_->DestroyPipeline({streamId_});
54 }
55 }
56
ConfigStream(StreamConfiguration & config)57 RetCode StreamBase::ConfigStream(StreamConfiguration& config)
58 {
59 if (state_ != STREAM_STATE_IDLE) {
60 return RC_ERROR;
61 }
62
63 streamConfig_ = config;
64 streamConfig_.usage = GetUsage();
65 if (tunnel_ != nullptr) {
66 streamConfig_.tunnelMode = true;
67 }
68 streamConfig_.bufferCount = GetBufferCount();
69 streamConfig_.maxBatchCaptureCount = 1;
70 streamConfig_.maxCaptureCount = 1;
71 // get device cappability to overide configuration
72 return RC_OK;
73 }
74
CommitStream()75 RetCode StreamBase::CommitStream()
76 {
77 if (state_ != STREAM_STATE_IDLE) {
78 return RC_ERROR;
79 }
80
81 CHECK_IF_PTR_NULL_RETURN_VALUE(pipelineCore_, RC_ERROR);
82
83 pipeline_ = pipelineCore_->GetStreamPipelineCore();
84 CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
85
86 hostStreamMgr_ = pipelineCore_->GetHostStreamMgr();
87 CHECK_IF_PTR_NULL_RETURN_VALUE(hostStreamMgr_, RC_ERROR);
88
89 HostStreamInfo info;
90 info.type_ = static_cast<StreamIntent>(streamType_);
91 info.streamId_ = streamId_;
92 info.width_ = streamConfig_.width;
93 info.height_ = streamConfig_.height;
94 info.format_ = streamConfig_.format;
95 info.usage_ = streamConfig_.usage;
96 info.encodeType_ = streamConfig_.encodeType;
97
98 if (streamConfig_.tunnelMode) {
99 BufferManager* mgr = BufferManager::GetInstance();
100 CHECK_IF_PTR_NULL_RETURN_VALUE(mgr, RC_ERROR);
101
102 if (bufferPool_ == nullptr) {
103 poolId_ = mgr->GenerateBufferPoolId();
104 CHECK_IF_EQUAL_RETURN_VALUE(poolId_, 0, RC_ERROR);
105 bufferPool_ = mgr->GetBufferPool(poolId_);
106 if (bufferPool_ == nullptr) {
107 CAMERA_LOGE("stream [id:%{public}d] get buffer pool failed.", streamId_);
108 return RC_ERROR;
109 }
110 }
111
112 info.bufferPoolId_ = poolId_;
113 info.bufferCount_ = GetBufferCount();
114 RetCode rc = bufferPool_->Init(streamConfig_.width, streamConfig_.height, streamConfig_.usage,
115 streamConfig_.format, GetBufferCount(), CAMERA_BUFFER_SOURCE_TYPE_EXTERNAL);
116 if (rc != RC_OK) {
117 CAMERA_LOGE("stream [id:%{public}d] initialize buffer pool failed.", streamId_);
118 return RC_ERROR;
119 }
120 }
121
122 RetCode rc = hostStreamMgr_->CreateHostStream(info, [this](std::shared_ptr<IBuffer> buffer) {
123 HandleResult(buffer);
124 return;
125 });
126 if (rc != RC_OK) {
127 CAMERA_LOGE("commit stream [id:%{public}d] to pipeline failed.", streamId_);
128 return RC_ERROR;
129 }
130 CAMERA_LOGI("commit a stream to pipeline id[%{public}d], w[%{public}d], h[%{public}d], poolId[%{public}llu], \
131 encodeType = %{public}d", info.streamId_, info.width_, info.height_, info.bufferPoolId_, info.encodeType_);
132 state_ = STREAM_STATE_ACTIVE;
133
134 return RC_OK;
135 }
136
StartStream()137 RetCode StreamBase::StartStream()
138 {
139 CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
140
141 if (state_ != STREAM_STATE_ACTIVE) {
142 return RC_ERROR;
143 }
144
145 CAMERA_LOGI("start stream [id:%{public}d] begin", streamId_);
146 tunnel_->NotifyStart();
147
148 RetCode rc = pipeline_->Prepare({streamId_});
149 if (rc != RC_OK) {
150 CAMERA_LOGE("pipeline [id:%{public}d] prepare failed", streamId_);
151 return rc;
152 }
153
154 state_ = STREAM_STATE_BUSY;
155 std::string threadName =
156 g_availableStreamType[static_cast<StreamIntent>(streamType_)] + "#" + std::to_string(streamId_);
157 handler_ = std::make_unique<std::thread>([this, &threadName] {
158 prctl(PR_SET_NAME, threadName.c_str());
159 while (state_ == STREAM_STATE_BUSY) {
160 HandleRequest();
161 }
162 });
163 if (handler_ == nullptr) {
164 state_ = STREAM_STATE_ACTIVE;
165 return RC_ERROR;
166 }
167
168 rc = pipeline_->Start({streamId_});
169 if (rc != RC_OK) {
170 CAMERA_LOGE("pipeline [%{public}d] start failed", streamId_);
171 return RC_ERROR;
172 }
173 CAMERA_LOGI("start stream [id:%{public}d] end", streamId_);
174
175 return RC_OK;
176 }
177
StopStream()178 RetCode StreamBase::StopStream()
179 {
180 CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
181 if (state_ == STREAM_STATE_IDLE) {
182 CAMERA_LOGI("stream [id:%{public}d], no need to stop", streamId_);
183 return RC_OK;
184 }
185 CAMERA_LOGI("stop stream [id:%{public}d] begin", streamId_);
186
187 state_ = STREAM_STATE_IDLE;
188
189 tunnel_->NotifyStop();
190 cv_.notify_one();
191 if (handler_ != nullptr) {
192 handler_->join();
193 }
194
195 if (!waitingList_.empty()) {
196 auto request = waitingList_.front();
197 if (request != nullptr && request->IsContinous()) {
198 request->Cancel();
199 }
200 }
201 {
202 std::unique_lock<std::mutex> l(wtLock_);
203 waitingList_.clear();
204 }
205
206 RetCode rc = pipeline_->Flush({streamId_});
207 if (rc != RC_OK) {
208 CAMERA_LOGE("stream [id:%{public}d], pipeline flush failed", streamId_);
209 return RC_ERROR;
210 }
211
212 CAMERA_LOGI("stream [id:%{public}d] is waiting buffers returned", streamId_);
213 tunnel_->WaitForAllBufferReturned();
214 CAMERA_LOGI("stream [id:%{public}d], all buffers are returned.", streamId_);
215
216 rc = pipeline_->Stop({streamId_});
217 if (rc != RC_OK) {
218 CAMERA_LOGE("stream [id:%{public}d], pipeline stop failed", streamId_);
219 return RC_ERROR;
220 }
221
222 if (lastRequest_->IsContinous() && !inTransitList_.empty()) {
223 std::shared_ptr<ICaptureMessage> endMessage =
224 std::make_shared<CaptureEndedMessage>(streamId_, lastRequest_->GetCaptureId(),
225 lastRequest_->GetEndTime(), lastRequest_->GetOwnerCount(), tunnel_->GetFrameCount());
226 CAMERA_LOGV("end of stream [%{public}d], ready to send end message", streamId_);
227 messenger_->SendMessage(endMessage);
228 }
229
230 CAMERA_LOGI("stop stream [id:%{public}d] end", streamId_);
231 isFirstRequest = true;
232
233 inTransitList_.clear();
234 tunnel_->CleanBuffers();
235 bufferPool_->ClearBuffers();
236 return RC_OK;
237 }
238
AddRequest(std::shared_ptr<CaptureRequest> & request)239 RetCode StreamBase::AddRequest(std::shared_ptr<CaptureRequest>& request)
240 {
241 CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
242 request->AddOwner(shared_from_this());
243
244 request->SetFirstRequest(false);
245 if (isFirstRequest) {
246 RetCode rc = StartStream();
247 if (rc != RC_OK) {
248 CAMERA_LOGE("start stream [id:%{public}d] failed", streamId_);
249 return RC_ERROR;
250 }
251 request->SetFirstRequest(true);
252 isFirstRequest = false;
253 }
254
255 {
256 std::unique_lock<std::mutex> l(wtLock_);
257 waitingList_.emplace_back(request);
258 cv_.notify_one();
259 }
260
261 return RC_OK;
262 }
263
CancelRequest(const std::shared_ptr<CaptureRequest> & request)264 RetCode StreamBase::CancelRequest(const std::shared_ptr<CaptureRequest>& request)
265 {
266 CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
267
268 {
269 // We don't care if this request is continious-capture or single-capture, just erase it.
270 // And those requests in inTransitList_ removed in HandleResult.
271 std::unique_lock<std::mutex> wl(wtLock_);
272 auto it = std::find(waitingList_.begin(), waitingList_.end(), request);
273 if (it != waitingList_.end()) {
274 waitingList_.erase(it);
275 CAMERA_LOGI("stream [id:%{public}d], cancel request(capture id:%{public}d) success",
276 streamId_, request->GetCaptureId());
277 }
278 }
279
280 if (request->IsContinous()) {
281 // may be this is the last request
282 std::unique_lock<std::mutex> tl(tsLock_);
283 auto it = std::find(inTransitList_.begin(), inTransitList_.end(), request);
284 if (it == inTransitList_.end()) {
285 std::shared_ptr<ICaptureMessage> endMessage =
286 std::make_shared<CaptureEndedMessage>(streamId_, request->GetCaptureId(), request->GetEndTime(),
287 request->GetOwnerCount(), tunnel_->GetFrameCount());
288 CAMERA_LOGV("end of stream [%{public}d], ready to send end message", streamId_);
289 messenger_->SendMessage(endMessage);
290 pipeline_->CancelCapture({streamId_});
291 }
292 }
293 return RC_OK;
294 }
295
HandleRequest()296 void StreamBase::HandleRequest()
297 {
298 if (waitingList_.empty()) {
299 std::unique_lock<std::mutex> l(wtLock_);
300 if (waitingList_.empty()) {
301 cv_.wait(l, [this] { return !(state_ == STREAM_STATE_BUSY && waitingList_.empty()); });
302 }
303 }
304 if (state_ != STREAM_STATE_BUSY) {
305 return;
306 }
307
308 std::shared_ptr<CaptureRequest> request = nullptr;
309 {
310 // keep a copy of continious-capture in waitingList_, unless it's going to be canceled.
311 std::unique_lock<std::mutex> l(wtLock_);
312 if (waitingList_.empty()) {
313 return;
314 }
315 request = waitingList_.front();
316 CHECK_IF_PTR_NULL_RETURN_VOID(request);
317 if (!request->IsContinous()) {
318 waitingList_.pop_front();
319 }
320 }
321 if (request == nullptr) {
322 CAMERA_LOGE("fatal error, stream [%{public}d] request list is not empty, but can't get one", streamId_);
323 return;
324 }
325
326 if (request->NeedCancel()) {
327 return;
328 }
329
330 request->Process(streamId_);
331
332 return;
333 }
334
Capture(const std::shared_ptr<CaptureRequest> & request)335 RetCode StreamBase::Capture(const std::shared_ptr<CaptureRequest>& request)
336 {
337 CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
338 CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
339
340 RetCode rc = RC_ERROR;
341 if (request->IsFirstOne() && !request->IsContinous()) {
342 uint32_t n = GetBufferCount();
343 for (uint32_t i = 0; i < n; i++) {
344 DeliverBuffer();
345 }
346 } else {
347 do {
348 rc = DeliverBuffer();
349 } while (rc != RC_OK && state_ == STREAM_STATE_BUSY);
350 }
351
352 if (request->NeedCancel()) {
353 CAMERA_LOGE("StreamBase::Capture stream [id:%{public}d] request->NeedCancel", streamId_);
354 return RC_OK;
355 }
356
357 rc = pipeline_->Config({streamId_}, request->GetCaptureSetting());
358 if (rc != RC_OK) {
359 CAMERA_LOGE("stream [id:%{public}d] config pipeline failed.", streamId_);
360 return RC_ERROR;
361 }
362
363 rc = pipeline_->Capture({streamId_}, request->GetCaptureId());
364 if (rc != RC_OK) {
365 CAMERA_LOGE("stream [id:%{public}d] take a capture failed.", streamId_);
366 return RC_ERROR;
367 }
368
369 {
370 std::unique_lock<std::mutex> l(tsLock_);
371 inTransitList_.emplace_back(request);
372 }
373
374 if (request->IsFirstOne()) {
375 if (messenger_ == nullptr) {
376 CAMERA_LOGE("stream [id:%{public}d] can't send message, messenger_ is null", streamId_);
377 return RC_ERROR;
378 }
379 std::shared_ptr<ICaptureMessage> startMessage = std::make_shared<CaptureStartedMessage>(
380 streamId_, request->GetCaptureId(), request->GetBeginTime(), request->GetOwnerCount());
381 messenger_->SendMessage(startMessage);
382 request->SetFirstRequest(false);
383 }
384
385 return RC_OK;
386 }
387
DeliverBuffer()388 RetCode StreamBase::DeliverBuffer()
389 {
390 CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, RC_ERROR);
391 CHECK_IF_PTR_NULL_RETURN_VALUE(bufferPool_, RC_ERROR);
392
393 std::shared_ptr<IBuffer> buffer = tunnel_->GetBuffer();
394 CHECK_IF_PTR_NULL_RETURN_VALUE(buffer, RC_ERROR);
395
396 buffer->SetEncodeType(streamConfig_.encodeType);
397 buffer->SetStreamId(streamId_);
398 bufferPool_->AddBuffer(buffer);
399 CAMERA_LOGI("stream [id:%{public}d] enqueue buffer index:%{public}d", streamId_, buffer->GetIndex());
400 return RC_OK;
401 }
402
HandleResult(std::shared_ptr<IBuffer> & buffer)403 void StreamBase::HandleResult(std::shared_ptr<IBuffer>& buffer)
404 {
405 CHECK_IF_PTR_NULL_RETURN_VOID(buffer);
406 if (buffer->GetBufferStatus() == CAMERA_BUFFER_STATUS_INVALID) {
407 CAMERA_LOGI("stream [id:%{public}d], this buffer(index:%{public}d) has nothing to do with request.", streamId_,
408 buffer->GetIndex());
409 ReceiveBuffer(buffer);
410 return;
411 }
412
413 if (buffer->GetStreamId() != streamId_) {
414 CAMERA_LOGE("fatal error, stream [%{public}d] reveived a wrong buffer, index:%{public}d. \
415 this buffer belongs to stream:%{public}d", streamId_, buffer->GetIndex(), buffer->GetStreamId());
416 return;
417 }
418
419 int32_t captureId = buffer->GetCaptureId();
420 std::shared_ptr<CaptureRequest> request = nullptr;
421 {
422 std::unique_lock<std::mutex> l(tsLock_);
423 for (auto& r : inTransitList_) {
424 if (r == nullptr) {
425 continue;
426 }
427 if (r->GetCaptureId() == captureId) {
428 request = r;
429 }
430 }
431 }
432 if (request == nullptr) {
433 CAMERA_LOGI("stream [id:%{public}d], this buffer(index:%{public}d) has nothing to do with request.",
434 streamId_, buffer->GetIndex());
435 buffer->SetBufferStatus(CAMERA_BUFFER_STATUS_INVALID);
436 ReceiveBuffer(buffer);
437 return;
438 }
439 request->AttachBuffer(buffer);
440 // To synchronize multiple stream, bottom-layer device stream need be synchronized first.
441 request->OnResult(streamId_);
442 lastRequest_ = request;
443 return;
444 }
445
OnFrame(const std::shared_ptr<CaptureRequest> & request)446 RetCode StreamBase::OnFrame(const std::shared_ptr<CaptureRequest>& request)
447 {
448 CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
449 CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
450 auto buffer = request->GetAttachedBuffer();
451 CameraBufferStatus status = buffer->GetBufferStatus();
452 if (status != CAMERA_BUFFER_STATUS_OK) {
453 if (status != CAMERA_BUFFER_STATUS_DROP) {
454 std::shared_ptr<ICaptureMessage> errorMessage =
455 std::make_shared<CaptureErrorMessage>(streamId_, request->GetCaptureId(), request->GetEndTime(),
456 request->GetOwnerCount(), static_cast<StreamError>(status));
457 messenger_->SendMessage(errorMessage);
458 }
459 }
460 if (request->NeedShutterCallback() && messenger_ != nullptr) {
461 std::shared_ptr<ICaptureMessage> shutterMessage = std::make_shared<FrameShutterMessage>(
462 streamId_, request->GetCaptureId(), request->GetEndTime(), request->GetOwnerCount());
463 messenger_->SendMessage(shutterMessage);
464 }
465
466 bool isEnded = false;
467 if (!request->IsContinous()) {
468 isEnded = true;
469 } else if (request->NeedCancel()) {
470 isEnded = true;
471 }
472
473 {
474 // inTransitList_ may has multiple copies of continious-capture request, we just need erase one of them.
475 std::unique_lock<std::mutex> l(tsLock_);
476 for (auto it = inTransitList_.begin(); it != inTransitList_.end(); it++) {
477 if ((*it) == request) {
478 inTransitList_.erase(it);
479 break;
480 }
481 }
482
483 if (isEnded) {
484 // if this is the last request of capture, send CaptureEndedMessage.
485 auto it = std::find(inTransitList_.begin(), inTransitList_.end(), request);
486 if (it == inTransitList_.end()) {
487 std::shared_ptr<ICaptureMessage> endMessage =
488 std::make_shared<CaptureEndedMessage>(streamId_, request->GetCaptureId(), request->GetEndTime(),
489 request->GetOwnerCount(), tunnel_->GetFrameCount());
490 CAMERA_LOGV("end of stream [%d], ready to send end message, capture id = %d",
491 streamId_, request->GetCaptureId());
492 messenger_->SendMessage(endMessage);
493 pipeline_->CancelCapture({streamId_});
494 }
495 }
496 }
497
498 ReceiveBuffer(buffer);
499 return RC_OK;
500 }
501
ReceiveBuffer(std::shared_ptr<IBuffer> & buffer)502 RetCode StreamBase::ReceiveBuffer(std::shared_ptr<IBuffer>& buffer)
503 {
504 CHECK_IF_PTR_NULL_RETURN_VALUE(buffer, RC_ERROR);
505 CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, RC_ERROR);
506 CHECK_IF_PTR_NULL_RETURN_VALUE(bufferPool_, RC_ERROR);
507
508 CAMERA_LOGI("stream [id:%{public}d] dequeue buffer index:%{public}d, status:%{public}d",
509 streamId_, buffer->GetIndex(), buffer->GetBufferStatus());
510 bufferPool_->ReturnBuffer(buffer);
511 tunnel_->PutBuffer(buffer);
512 return RC_OK;
513 }
514
GetFrameCount() const515 uint64_t StreamBase::GetFrameCount() const
516 {
517 CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, 0);
518 return tunnel_->GetFrameCount();
519 }
520
AttachStreamTunnel(std::shared_ptr<StreamTunnel> & tunnel)521 RetCode StreamBase::AttachStreamTunnel(std::shared_ptr<StreamTunnel>& tunnel)
522 {
523 if (state_ == STREAM_STATE_BUSY || state_ == STREAM_STATE_OFFLINE) {
524 return RC_ERROR;
525 }
526
527 tunnel_ = tunnel;
528 CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, RC_ERROR);
529 tunnel_->SetBufferCount(GetBufferCount());
530 TunnelConfig config = {(uint32_t)streamConfig_.width, (uint32_t)streamConfig_.height,
531 (uint32_t)streamConfig_.format, streamConfig_.usage};
532 tunnel_->Config(config);
533
534 streamConfig_.tunnelMode = true;
535 return RC_OK;
536 }
537
DetachStreamTunnel()538 RetCode StreamBase::DetachStreamTunnel()
539 {
540 if (state_ == STREAM_STATE_BUSY || state_ == STREAM_STATE_OFFLINE) {
541 return RC_ERROR;
542 }
543
544 tunnel_.reset();
545 streamConfig_.tunnelMode = false;
546
547 state_ = STREAM_STATE_IDLE;
548 return RC_OK;
549 }
550
ChangeToOfflineStream(std::shared_ptr<OfflineStream> offlineStream)551 RetCode StreamBase::ChangeToOfflineStream(std::shared_ptr<OfflineStream> offlineStream)
552 {
553 (void)offlineStream;
554 return RC_OK;
555 }
556
GetUsage()557 uint64_t StreamBase::GetUsage()
558 {
559 return CAMERA_USAGE_SW_WRITE_OFTEN | CAMERA_USAGE_SW_READ_OFTEN | CAMERA_USAGE_MEM_DMA;
560 }
561
GetBufferCount()562 uint32_t StreamBase::GetBufferCount()
563 {
564 return 3; // 3: buffer count
565 }
566
GetStreamAttribute() const567 StreamConfiguration StreamBase::GetStreamAttribute() const
568 {
569 return streamConfig_;
570 }
571
GetStreamId() const572 int32_t StreamBase::GetStreamId() const
573 {
574 return streamId_;
575 }
576
IsRunning() const577 bool StreamBase::IsRunning() const
578 {
579 return state_ == STREAM_STATE_BUSY;
580 }
581
GetTunnelMode() const582 bool StreamBase::GetTunnelMode() const
583 {
584 return streamConfig_.tunnelMode;
585 }
586 } // namespace OHOS::Camera
587