• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "stream_base.h"
17 #include "buffer_adapter.h"
18 #include "buffer_manager.h"
19 #include "watchdog.h"
20 
21 namespace OHOS::Camera {
22 std::map<StreamIntent, std::string> IStream::g_avaliableStreamType = {
23     {PREVIEW, STREAM_INTENT_TO_STRING(PREVIEW)},
24     {VIDEO, STREAM_INTENT_TO_STRING(VIDEO)},
25     {STILL_CAPTURE, STREAM_INTENT_TO_STRING(STILL_CAPTURE)},
26     {POST_VIEW, STREAM_INTENT_TO_STRING(POST_VIEW)},
27     {ANALYZE, STREAM_INTENT_TO_STRING(ANALYZE)},
28     {CUSTOM, STREAM_INTENT_TO_STRING(CUSTOM)},
29 };
30 
StreamBase(const int32_t id,const StreamIntent type,std::shared_ptr<IPipelineCore> & p,std::shared_ptr<CaptureMessageOperator> & m)31 StreamBase::StreamBase(const int32_t id,
32                        const StreamIntent type,
33                        std::shared_ptr<IPipelineCore>& p,
34                        std::shared_ptr<CaptureMessageOperator>& m)
35 {
36     streamId_ = id;
37     streamType_ = static_cast<int32_t>(type);
38     pipelineCore_ = p;
39     messenger_ = m;
40 }
41 
~StreamBase()42 StreamBase::~StreamBase()
43 {
44     if (state_ == STREAM_STATE_BUSY) {
45         StopStream();
46     }
47 
48     if (hostStreamMgr_ != nullptr) {
49         hostStreamMgr_->DestroyHostStream({streamId_});
50     }
51 
52     if (pipeline_ != nullptr) {
53         pipeline_->DestroyPipeline({streamId_});
54     }
55 }
56 
ConfigStream(StreamConfiguration & config)57 RetCode StreamBase::ConfigStream(StreamConfiguration& config)
58 {
59     if (state_ != STREAM_STATE_IDLE) {
60         return RC_ERROR;
61     }
62 
63     streamConfig_ = config;
64     streamConfig_.usage = GetUsage();
65     if (tunnel_ != nullptr) {
66         streamConfig_.tunnelMode = true;
67     }
68     streamConfig_.bufferCount = GetBufferCount();
69     streamConfig_.maxBatchCaptureCount = 1;
70     streamConfig_.maxCaptureCount = 1;
71     // get device cappability to overide configuration
72     return RC_OK;
73 }
74 
CommitStream()75 RetCode StreamBase::CommitStream()
76 {
77     if (state_ != STREAM_STATE_IDLE) {
78         return RC_ERROR;
79     }
80 
81     CHECK_IF_PTR_NULL_RETURN_VALUE(pipelineCore_, RC_ERROR);
82 
83     pipeline_ = pipelineCore_->GetStreamPipelineCore();
84     CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
85 
86     hostStreamMgr_ = pipelineCore_->GetHostStreamMgr();
87     CHECK_IF_PTR_NULL_RETURN_VALUE(hostStreamMgr_, RC_ERROR);
88 
89     HostStreamInfo info;
90     info.type_ = static_cast<StreamIntent>(streamType_);
91     info.streamId_ = streamId_;
92     info.width_ = streamConfig_.width;
93     info.height_ = streamConfig_.height;
94     info.format_ = streamConfig_.format;
95     info.usage_ = streamConfig_.usage;
96     info.encodeType_ = streamConfig_.encodeType;
97 
98     if (streamConfig_.tunnelMode) {
99         BufferManager* mgr = BufferManager::GetInstance();
100         CHECK_IF_PTR_NULL_RETURN_VALUE(mgr, RC_ERROR);
101 
102         if (bufferPool_ == nullptr) {
103             poolId_ = mgr->GenerateBufferPoolId();
104             CHECK_IF_EQUAL_RETURN_VALUE(poolId_, 0, RC_ERROR);
105 
106             bufferPool_ = mgr->GetBufferPool(poolId_);
107             if (bufferPool_ == nullptr) {
108                 CAMERA_LOGE("stream [id:%{public}d] get buffer pool failed.", streamId_);
109                 return RC_ERROR;
110             }
111         }
112 
113         info.bufferPoolId_ = poolId_;
114         info.bufferCount_ = GetBufferCount();
115         RetCode rc = bufferPool_->Init(streamConfig_.width, streamConfig_.height, streamConfig_.usage,
116                                        streamConfig_.format, GetBufferCount(), CAMERA_BUFFER_SOURCE_TYPE_EXTERNAL);
117         if (rc != RC_OK) {
118             CAMERA_LOGE("stream [id:%{public}d] initialize buffer pool failed.", streamId_);
119             return RC_ERROR;
120         }
121     }
122 
123     RetCode rc = hostStreamMgr_->CreateHostStream(info, [this](std::shared_ptr<IBuffer> buffer) {
124         HandleResult(buffer);
125         return;
126     });
127     if (rc != RC_OK) {
128         CAMERA_LOGE("commit stream [id:%{public}d] to pipeline failed.", streamId_);
129         return RC_ERROR;
130     }
131     CAMERA_LOGI("commit a stream to pipeline id[%{public}d], w[%{public}d], h[%{public}d], poolId[%{public}llu], \
132         encodeType = %{public}d", info.streamId_, info.width_, info.height_, info.bufferPoolId_, info.encodeType_);
133     state_ = STREAM_STATE_ACTIVE;
134 
135     return RC_OK;
136 }
137 
StartStream()138 RetCode StreamBase::StartStream()
139 {
140     CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
141 
142     if (state_ != STREAM_STATE_ACTIVE) {
143         return RC_ERROR;
144     }
145 
146     CAMERA_LOGI("start stream [id:%{public}d] begin", streamId_);
147     tunnel_->NotifyStart();
148 
149     RetCode rc = pipeline_->Prepare({streamId_});
150     if (rc != RC_OK) {
151         CAMERA_LOGE("pipeline [id:%{public}d] prepare failed", streamId_);
152         return rc;
153     }
154 
155     state_ = STREAM_STATE_BUSY;
156     std::string threadName =
157         g_avaliableStreamType[static_cast<StreamIntent>(streamType_)] + "#" + std::to_string(streamId_);
158     handler_ = std::make_unique<std::thread>([this, &threadName] {
159         prctl(PR_SET_NAME, threadName.c_str());
160         while (state_ == STREAM_STATE_BUSY) {
161             HandleRequest();
162         }
163     });
164     if (handler_ == nullptr) {
165         state_ = STREAM_STATE_ACTIVE;
166         return RC_ERROR;
167     }
168 
169     rc = pipeline_->Start({streamId_});
170     if (rc != RC_OK) {
171         CAMERA_LOGE("pipeline [%{public}d] start failed", streamId_);
172         return RC_ERROR;
173     }
174     CAMERA_LOGI("start stream [id:%{public}d] end", streamId_);
175 
176     return RC_OK;
177 }
178 
StopStream()179 RetCode StreamBase::StopStream()
180 {
181     CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
182     if (state_ == STREAM_STATE_IDLE) {
183         CAMERA_LOGI("stream [id:%{public}d], no need to stop", streamId_);
184         return RC_OK;
185     }
186     CAMERA_LOGI("stop stream [id:%{public}d] begin", streamId_);
187 
188     state_ = STREAM_STATE_IDLE;
189 
190     tunnel_->NotifyStop();
191     cv_.notify_one();
192     if (handler_ != nullptr) {
193         handler_->join();
194     }
195 
196     if (!waitingList_.empty()) {
197         auto request = waitingList_.front();
198         if (request != nullptr && request->IsContinous()) {
199             request->Cancel();
200         }
201     }
202     {
203         std::unique_lock<std::mutex> l(wtLock_);
204         waitingList_.clear();
205     }
206 
207     RetCode rc = pipeline_->Flush({streamId_});
208     if (rc != RC_OK) {
209         CAMERA_LOGE("stream [id:%{public}d], pipeline flush failed", streamId_);
210         return RC_ERROR;
211     }
212 
213     CAMERA_LOGI("stream [id:%{public}d] is waiting buffers returned", streamId_);
214     tunnel_->WaitForAllBufferReturned();
215     CAMERA_LOGI("stream [id:%{public}d], all buffers are returned.", streamId_);
216 
217     rc = pipeline_->Stop({streamId_});
218     if (rc != RC_OK) {
219         CAMERA_LOGE("stream [id:%{public}d], pipeline stop failed", streamId_);
220         return RC_ERROR;
221     }
222 
223     if (lastRequest_->IsContinous() && !inTransitList_.empty()) {
224         std::shared_ptr<ICaptureMessage> endMessage =
225             std::make_shared<CaptureEndedMessage>(streamId_, lastRequest_->GetCaptureId(),
226             lastRequest_->GetEndTime(), lastRequest_->GetOwnerCount(), tunnel_->GetFrameCount());
227         CAMERA_LOGV("end of stream [%{public}d], ready to send end message", streamId_);
228         messenger_->SendMessage(endMessage);
229     }
230 
231     CAMERA_LOGI("stop stream [id:%{public}d] end", streamId_);
232     isFirstRequest = true;
233 
234     inTransitList_.clear();
235     tunnel_->CleanBuffers();
236     bufferPool_->ClearBuffers();
237     return RC_OK;
238 }
239 
AddRequest(std::shared_ptr<CaptureRequest> & request)240 RetCode StreamBase::AddRequest(std::shared_ptr<CaptureRequest>& request)
241 {
242     CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
243     request->AddOwner(shared_from_this());
244 
245     request->SetFirstRequest(false);
246     if (isFirstRequest) {
247         uint32_t n = GetBufferCount();
248         if (!request->IsContinous()) {
249             for (uint32_t i = 0; i < n; i++) {
250                 DeliverBuffer();
251             }
252         }
253 
254         RetCode rc = StartStream();
255         if (rc != RC_OK) {
256             CAMERA_LOGE("start stream [id:%{public}d] failed", streamId_);
257             return RC_ERROR;
258         }
259         request->SetFirstRequest(true);
260         isFirstRequest = false;
261     }
262 
263     {
264         std::unique_lock<std::mutex> l(wtLock_);
265         waitingList_.emplace_back(request);
266         cv_.notify_one();
267     }
268 
269     return RC_OK;
270 }
271 
CancelRequest(const std::shared_ptr<CaptureRequest> & request)272 RetCode StreamBase::CancelRequest(const std::shared_ptr<CaptureRequest>& request)
273 {
274     CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
275 
276     {
277         // We don't care if this request is continious-capture or single-capture, just erase it.
278         // And those requests in inTransitList_ will be removed in HandleResult.
279         std::unique_lock<std::mutex> wl(wtLock_);
280         auto it = std::find(waitingList_.begin(), waitingList_.end(), request);
281         if (it != waitingList_.end()) {
282             waitingList_.erase(it);
283             CAMERA_LOGI("stream [id:%{public}d], cancel request(capture id:%{public}d) success",
284                 streamId_, request->GetCaptureId());
285         }
286     }
287 
288     if (request->IsContinous()) {
289         // may be this is the last request
290         std::unique_lock<std::mutex> tl(tsLock_);
291         auto it = std::find(inTransitList_.begin(), inTransitList_.end(), request);
292         if (it == inTransitList_.end()) {
293             std::shared_ptr<ICaptureMessage> endMessage =
294                 std::make_shared<CaptureEndedMessage>(streamId_, request->GetCaptureId(), request->GetEndTime(),
295                                                       request->GetOwnerCount(), tunnel_->GetFrameCount());
296             CAMERA_LOGV("end of stream [%{public}d], ready to send end message", streamId_);
297             messenger_->SendMessage(endMessage);
298             pipeline_->CancelCapture({streamId_});
299         }
300     }
301     return RC_OK;
302 }
303 
HandleRequest()304 void StreamBase::HandleRequest()
305 {
306     if (waitingList_.empty()) {
307         std::unique_lock<std::mutex> l(wtLock_);
308         if (waitingList_.empty()) {
309             cv_.wait(l, [this] { return !(state_ == STREAM_STATE_BUSY && waitingList_.empty()); });
310         }
311     }
312     if (state_ != STREAM_STATE_BUSY) {
313         return;
314     }
315 
316     std::shared_ptr<CaptureRequest> request = nullptr;
317     {
318         // keep a copy of continious-capture in waitingList_, unless it's going to be canceled.
319         std::unique_lock<std::mutex> l(wtLock_);
320         if (waitingList_.empty()) {
321             return;
322         }
323         request = waitingList_.front();
324         CHECK_IF_PTR_NULL_RETURN_VOID(request);
325         if (!request->IsContinous()) {
326             waitingList_.pop_front();
327         }
328     }
329     if (request == nullptr) {
330         CAMERA_LOGE("fatal error, stream [%{public}d] request list is not empty, but can't get one", streamId_);
331         return;
332     }
333 
334     {
335         std::unique_lock<std::mutex> l(tsLock_);
336         if (request->NeedCancel()) {
337             return;
338         }
339         inTransitList_.emplace_back(request);
340     }
341     request->Process(streamId_);
342 
343     return;
344 }
345 
Capture(const std::shared_ptr<CaptureRequest> & request)346 RetCode StreamBase::Capture(const std::shared_ptr<CaptureRequest>& request)
347 {
348     CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
349     CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
350 
351     RetCode rc = RC_ERROR;
352 
353     rc = pipeline_->Config({streamId_}, request->GetCaptureSetting());
354     if (rc != RC_OK) {
355         CAMERA_LOGE("stream [id:%{public}d] config pipeline failed.", streamId_);
356         return RC_ERROR;
357     }
358 
359     rc = pipeline_->Capture({streamId_}, request->GetCaptureId());
360     if (rc != RC_OK) {
361         CAMERA_LOGE("stream [id:%{public}d] take a capture failed.", streamId_);
362         return RC_ERROR;
363     }
364 
365     if (request->IsFirstOne()) {
366         if (messenger_ == nullptr) {
367             CAMERA_LOGE("stream [id:%{public}d] can't send message, messenger_ is null", streamId_);
368             return RC_ERROR;
369         }
370         std::shared_ptr<ICaptureMessage> startMessage = std::make_shared<CaptureStartedMessage>(
371             streamId_, request->GetCaptureId(), request->GetBeginTime(), request->GetOwnerCount());
372         messenger_->SendMessage(startMessage);
373         request->SetFirstRequest(false);
374     }
375 
376     // DeliverBuffer must be called after Capture, or this capture request will miss a buffer.
377     do {
378         rc = DeliverBuffer();
379     } while (rc != RC_OK && state_ == STREAM_STATE_BUSY);
380 
381     return RC_OK;
382 }
383 
DeliverBuffer()384 RetCode StreamBase::DeliverBuffer()
385 {
386     CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, RC_ERROR);
387     CHECK_IF_PTR_NULL_RETURN_VALUE(bufferPool_, RC_ERROR);
388 
389     std::shared_ptr<IBuffer> buffer = tunnel_->GetBuffer();
390     CHECK_IF_PTR_NULL_RETURN_VALUE(buffer, RC_ERROR);
391 
392     buffer->SetEncodeType(streamConfig_.encodeType);
393     buffer->SetStreamId(streamId_);
394     bufferPool_->AddBuffer(buffer);
395     CAMERA_LOGI("stream [id:%{public}d] enqueue buffer index:%{public}d", streamId_, buffer->GetIndex());
396     return RC_OK;
397 }
398 
HandleResult(std::shared_ptr<IBuffer> & buffer)399 void StreamBase::HandleResult(std::shared_ptr<IBuffer>& buffer)
400 {
401     CHECK_IF_PTR_NULL_RETURN_VOID(buffer);
402     if (buffer->GetBufferStatus() == CAMERA_BUFFER_STATUS_INVALID) {
403         CAMERA_LOGI("stream [id:%{public}d], this buffer(index:%{public}d) has nothing to do with request.", streamId_,
404                     buffer->GetIndex());
405         ReceiveBuffer(buffer);
406         return;
407     }
408 
409     if (buffer->GetStreamId() != streamId_) {
410         CAMERA_LOGE("fatal error, stream [%{public}d] reveived a wrong buffer, index:%{public}d. \
411             this buffer belongs to stream:%{public}d", streamId_, buffer->GetIndex(), buffer->GetStreamId());
412         return;
413     }
414 
415     int32_t captureId = buffer->GetCaptureId();
416     std::shared_ptr<CaptureRequest> request = nullptr;
417     {
418         std::unique_lock<std::mutex> l(tsLock_);
419         for (auto& r : inTransitList_) {
420             if (r == nullptr) {
421                 continue;
422             }
423             if (r->GetCaptureId() == captureId) {
424                 request = r;
425             }
426         }
427     }
428     if (request == nullptr) {
429         CAMERA_LOGI("stream [id:%{public}d], this buffer(index:%{public}d) has nothing to do with request.",
430             streamId_, buffer->GetIndex());
431         buffer->SetBufferStatus(CAMERA_BUFFER_STATUS_INVALID);
432         ReceiveBuffer(buffer);
433         return;
434     }
435     request->AttachBuffer(buffer);
436     // To synchronize multiple stream, bottom-layer device stream need be synchronized first.
437     request->OnResult(streamId_);
438     lastRequest_ = request;
439 
440     return;
441 }
442 
OnFrame(const std::shared_ptr<CaptureRequest> & request)443 RetCode StreamBase::OnFrame(const std::shared_ptr<CaptureRequest>& request)
444 {
445     CHECK_IF_PTR_NULL_RETURN_VALUE(request, RC_ERROR);
446     CHECK_IF_PTR_NULL_RETURN_VALUE(pipeline_, RC_ERROR);
447     auto buffer = request->GetAttachedBuffer();
448     CameraBufferStatus status = buffer->GetBufferStatus();
449     if (status != CAMERA_BUFFER_STATUS_OK) {
450         if (status != CAMERA_BUFFER_STATUS_DROP) {
451             std::shared_ptr<ICaptureMessage> errorMessage =
452                 std::make_shared<CaptureErrorMessage>(streamId_, request->GetCaptureId(), request->GetEndTime(),
453                                                       request->GetOwnerCount(), static_cast<StreamError>(status));
454             messenger_->SendMessage(errorMessage);
455         }
456     }
457     if (request->NeedShutterCallback() && messenger_ != nullptr) {
458         std::shared_ptr<ICaptureMessage> shutterMessage = std::make_shared<FrameShutterMessage>(
459             streamId_, request->GetCaptureId(), request->GetEndTime(), request->GetOwnerCount());
460         messenger_->SendMessage(shutterMessage);
461     }
462 
463     bool isEnded = false;
464     if (!request->IsContinous()) {
465         isEnded = true;
466     } else if (request->NeedCancel()) {
467         isEnded = true;
468     }
469 
470     {
471         // inTransitList_ may have multiple copies of continious-capture request, we just need erase one of them.
472         std::unique_lock<std::mutex> l(tsLock_);
473         for (auto it = inTransitList_.begin(); it != inTransitList_.end(); it++) {
474             if ((*it) == request) {
475                 inTransitList_.erase(it);
476                 break;
477             }
478         }
479 
480         if (isEnded) {
481             // if this is the last request of capture, send CaptureEndedMessage.
482             auto it = std::find(inTransitList_.begin(), inTransitList_.end(), request);
483             if (it == inTransitList_.end()) {
484                 std::shared_ptr<ICaptureMessage> endMessage =
485                     std::make_shared<CaptureEndedMessage>(streamId_, request->GetCaptureId(), request->GetEndTime(),
486                                                           request->GetOwnerCount(), tunnel_->GetFrameCount());
487                 CAMERA_LOGV("end of stream [%d], ready to send end message, capture id = %d",
488                     streamId_, request->GetCaptureId());
489                 messenger_->SendMessage(endMessage);
490                 pipeline_->CancelCapture({streamId_});
491             }
492         }
493     }
494 
495     ReceiveBuffer(buffer);
496     return RC_OK;
497 }
498 
ReceiveBuffer(std::shared_ptr<IBuffer> & buffer)499 RetCode StreamBase::ReceiveBuffer(std::shared_ptr<IBuffer>& buffer)
500 {
501     CHECK_IF_PTR_NULL_RETURN_VALUE(buffer, RC_ERROR);
502     CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, RC_ERROR);
503     CHECK_IF_PTR_NULL_RETURN_VALUE(bufferPool_, RC_ERROR);
504 
505     CAMERA_LOGI("stream [id:%{public}d] dequeue buffer index:%{public}d, status:%{public}d",
506         streamId_, buffer->GetIndex(), buffer->GetBufferStatus());
507     bufferPool_->ReturnBuffer(buffer);
508     tunnel_->PutBuffer(buffer);
509     return RC_OK;
510 }
511 
GetFrameCount() const512 uint64_t StreamBase::GetFrameCount() const
513 {
514     CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, 0);
515     return tunnel_->GetFrameCount();
516 }
517 
AttachStreamTunnel(std::shared_ptr<StreamTunnel> & tunnel)518 RetCode StreamBase::AttachStreamTunnel(std::shared_ptr<StreamTunnel>& tunnel)
519 {
520     if (state_ == STREAM_STATE_BUSY || state_ == STREAM_STATE_OFFLINE) {
521         return RC_ERROR;
522     }
523 
524     tunnel_ = tunnel;
525     CHECK_IF_PTR_NULL_RETURN_VALUE(tunnel_, RC_ERROR);
526     tunnel_->SetBufferCount(GetBufferCount());
527     TunnelConfig config = {streamConfig_.width, streamConfig_.height, streamConfig_.format, streamConfig_.usage};
528     tunnel_->Config(config);
529 
530     streamConfig_.tunnelMode = true;
531     return RC_OK;
532 }
533 
DetachStreamTunnel()534 RetCode StreamBase::DetachStreamTunnel()
535 {
536     if (state_ == STREAM_STATE_BUSY || state_ == STREAM_STATE_OFFLINE) {
537         return RC_ERROR;
538     }
539 
540     tunnel_.reset();
541     streamConfig_.tunnelMode = false;
542 
543     state_ = STREAM_STATE_IDLE;
544     return RC_OK;
545 }
546 
ChangeToOfflineStream(std::shared_ptr<OfflineStream> offlineStream)547 RetCode StreamBase::ChangeToOfflineStream(std::shared_ptr<OfflineStream> offlineStream)
548 {
549     return RC_OK;
550 }
551 
GetUsage()552 uint64_t StreamBase::GetUsage()
553 {
554     return CAMERA_USAGE_SW_WRITE_OFTEN | CAMERA_USAGE_SW_READ_OFTEN | CAMERA_USAGE_MEM_DMA;
555 }
556 
GetBufferCount()557 uint32_t StreamBase::GetBufferCount()
558 {
559     return 3; // 3: buffer count
560 }
561 
GetStreamAttribute() const562 StreamConfiguration StreamBase::GetStreamAttribute() const
563 {
564     return streamConfig_;
565 }
566 
GetStreamId() const567 int32_t StreamBase::GetStreamId() const
568 {
569     return streamId_;
570 }
571 
IsRunning() const572 bool StreamBase::IsRunning() const
573 {
574     return state_ == STREAM_STATE_BUSY;
575 }
576 
GetTunnelMode() const577 bool StreamBase::GetTunnelMode() const
578 {
579     return streamConfig_.tunnelMode;
580 }
581 } // namespace OHOS::Camera
582