• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "SampleQueue"
17 
18 #include <sstream>
19 #include <securec.h>
20 #include "common/log.h"
21 #include "sample_queue.h"
22 #include "avcodec_trace.h"
23 
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_PLAYER, "SampleQueue" };
26 constexpr int32_t INVALID_TRACK_ID = -1;
27 }
28 
29 namespace OHOS {
30 namespace Media {
31 
32 class SampleBufferConsumerListener : public IConsumerListener {
33 public:
SampleBufferConsumerListener(std::shared_ptr<SampleQueue> sampleQueue)34     explicit SampleBufferConsumerListener(std::shared_ptr<SampleQueue> sampleQueue)
35         : sampleQueue_(std::move(sampleQueue))
36     {}
37     virtual ~SampleBufferConsumerListener() = default;
38 
OnBufferAvailable()39     void OnBufferAvailable() override
40     {
41         if (auto sampleQueue = sampleQueue_.lock()) {
42             sampleQueue->OnBufferConsumer();
43         } else {
44             MEDIA_LOG_E("consumer listener: Invalid sampleQueue instance.");
45         }
46     }
47 
48 private:
49     std::weak_ptr<SampleQueue> sampleQueue_;
50 };
51 
52 class SampleBufferProducerListener : public IRemoteStub<IProducerListener> {
53 public:
SampleBufferProducerListener(std::shared_ptr<SampleQueue> sampleQueue)54     explicit SampleBufferProducerListener(std::shared_ptr<SampleQueue> sampleQueue)
55         : sampleQueue_(std::move(sampleQueue))
56     {}
57     virtual ~SampleBufferProducerListener() = default;
58 
OnBufferAvailable()59     void OnBufferAvailable() override
60     {
61         if (auto sampleQueue = sampleQueue_.lock()) {
62             sampleQueue->OnBufferAvailable();
63         } else {
64             MEDIA_LOG_E("prodecer listener: Invalid sampleQueue instance.");
65         }
66     }
67 
68 private:
69     std::weak_ptr<SampleQueue> sampleQueue_;
70 };
71 
Init(const Config & config)72 Status SampleQueue::Init(const Config& config)
73 {
74     config_ = config;
75     config_.queueSize_ = std::min(config.queueSize_, MAX_SAMPLE_QUEUE_SIZE);
76     config_.bufferCap_ = std::min(config.bufferCap_, MAX_SAMPLE_BUFFER_CAP);
77     config_.queueName_ = "SampleQueue_" + std::to_string(config_.queueId_);
78     sampleBufferQueue_ = AVBufferQueue::Create(config_.queueSize_, MemoryType::VIRTUAL_MEMORY, config_.queueName_);
79     FALSE_RETURN_V_MSG_E(sampleBufferQueue_ != nullptr, Status::ERROR_NO_MEMORY, "AVBufferQueue::Create failed");
80     if (config_.isFlvLiveStream_) {
81         config_.queueSize_ = MAX_SAMPLE_QUEUE_SIZE;
82         sampleBufferQueue_->SetLargerQueueSize(config_.queueSize_);
83     }
84 
85     sampleBufferQueueProducer_ = sampleBufferQueue_->GetProducer();
86     sptr<IProducerListener> producerListener = OHOS::sptr<SampleBufferProducerListener>::MakeSptr(shared_from_this());
87     FALSE_RETURN_V_MSG_E(producerListener != nullptr, Status::ERROR_NO_MEMORY, "SampleBufferProducerListener nullptr");
88     sampleBufferQueueProducer_->SetBufferAvailableListener(producerListener);
89 
90     sampleBufferQueueConsumer_ = sampleBufferQueue_->GetConsumer();
91     sptr<IConsumerListener> consumerListener = new(std::nothrow) SampleBufferConsumerListener(shared_from_this());
92     FALSE_RETURN_V_MSG_E(consumerListener != nullptr, Status::ERROR_NO_MEMORY, "SampleBufferConsumerListener nullptr");
93     sampleBufferQueueConsumer_->SetBufferAvailableListener(consumerListener);
94 
95     MEDIA_LOG_I(PUBLIC_LOG_S " AVBufferQueue::Create queueSize_" PUBLIC_LOG_U32,
96         config_.queueName_.c_str(),
97         config_.queueSize_);
98     return AttachBuffer();
99 }
100 
SetLargerQueueSize(uint32_t size)101 Status SampleQueue::SetLargerQueueSize(uint32_t size)
102 {
103     if (size != config_.queueSize_) {
104         Status status = sampleBufferQueue_->SetLargerQueueSize(size);
105         FALSE_RETURN_V_MSG_E(status == Status::OK, status, "SetLargerQueueSize failed status=" PUBLIC_LOG_D32,
106             static_cast<int32_t>(status));
107         MEDIA_LOG_I("sampleBufferQueue size is change to " PUBLIC_LOG_U32, size);
108         config_.queueSize_ = size;
109     }
110     return Status::OK;
111 }
112 
AddQueueSize(uint32_t size)113 Status SampleQueue::AddQueueSize(uint32_t size)
114 {
115     Status status = sampleBufferQueue_->SetLargerQueueSize(config_.queueSize_ + size);
116     FALSE_RETURN_V(status == Status::OK, status);
117     config_.queueSize_ = config_.queueSize_ + size;
118     MEDIA_LOG_I("sampleBufferQueue size is add to " PUBLIC_LOG_U32, config_.queueSize_);
119     return Status::OK;
120 }
121 
AttachBuffer()122 Status SampleQueue::AttachBuffer()
123 {
124     for (uint32_t i = 0; i < config_.queueSize_; i++) {
125         auto avAllocator = AVAllocatorFactory::CreateVirtualAllocator();
126         std::shared_ptr<AVBuffer> buffer = AVBuffer::CreateAVBuffer(avAllocator, config_.bufferCap_);
127         FALSE_RETURN_V_MSG_E(buffer != nullptr, Status::ERROR_NO_MEMORY, "CreateAVBuffer failed");
128         Status status = sampleBufferQueueProducer_->AttachBuffer(buffer, false);
129         FALSE_RETURN_V_MSG_E(
130             status == Status::OK, status, "AttachBuffer failed status=" PUBLIC_LOG_D32, static_cast<int32_t>(status));
131     }
132     return Status::OK;
133 }
134 
SetSampleQueueCallback(std::shared_ptr<SampleQueueCallback> sampleQueueCb)135 Status SampleQueue::SetSampleQueueCallback(std::shared_ptr<SampleQueueCallback> sampleQueueCb)
136 {
137     sampleQueueCb_ = sampleQueueCb;
138     return Status::OK;
139 }
140 
GetBufferQueueProducer() const141 sptr<AVBufferQueueProducer> SampleQueue::GetBufferQueueProducer() const
142 {
143     return sampleBufferQueueProducer_;
144 }
145 
RequestBuffer(std::shared_ptr<AVBuffer> & sampleBuffer,const AVBufferConfig & config,int32_t timeoutMs)146 Status SampleQueue::RequestBuffer(
147     std::shared_ptr<AVBuffer> &sampleBuffer, const AVBufferConfig &config, int32_t timeoutMs)
148 {
149     MEDIA_TRACE_DEBUG("SampleQueue::RequestBuffer");
150     FALSE_RETURN_V(sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
151     MEDIA_LOG_DD(PUBLIC_LOG_S " sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
152         config_.queueName_.c_str(),
153         sampleBufferQueueProducer_->GetQueueSize());
154     return sampleBufferQueueProducer_->RequestBuffer(sampleBuffer, config, timeoutMs);
155 }
156 
PushBuffer(std::shared_ptr<AVBuffer> & sampleBuffer,bool available)157 Status SampleQueue::PushBuffer(std::shared_ptr<AVBuffer>& sampleBuffer, bool available)
158 {
159     MEDIA_TRACE_DEBUG("SampleQueue::PushBuffer");
160     FALSE_RETURN_V(sampleBuffer != nullptr && sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
161     MEDIA_LOG_DD(PUBLIC_LOG_S " sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
162         config_.queueName_.c_str(),
163         sampleBufferQueueProducer_->GetQueueSize());
164     Status status = sampleBufferQueueProducer_->PushBuffer(sampleBuffer, available);
165     FALSE_RETURN_V(available && status == Status::OK, status);
166 
167     lastEnterSamplePts_ = sampleBuffer->pts_;
168     MEDIA_LOG_DD(PUBLIC_LOG_S " PushBuffer pts=" PUBLIC_LOG_D64 " dts=" PUBLIC_LOG_D64 " duration=" PUBLIC_LOG_D64,
169         config_.queueName_.c_str(), sampleBuffer->pts_, sampleBuffer->dts_, sampleBuffer->duration_);
170     if (!config_.isSupportBitrateSwitch_) {
171         return Status::OK;
172     }
173 
174     if (!IsKeyFrame(sampleBuffer)) {
175         return Status::OK;
176     }
177     MEDIA_LOG_I(PUBLIC_LOG_S " insert Key Frame pts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->pts_);
178     {
179         std::lock_guard<std::mutex> ptsLock(ptsMutex_);
180         keyFramePtsSet_.insert(sampleBuffer->pts_);
181     }
182 
183     {
184         std::lock_guard<std::mutex> statusLock(statusMutex_);
185         if (IsSwitchBitrateOK()) {
186             NotifySwitchBitrateOK();
187         }
188     }
189     return Status::OK;
190 }
191 
AcquireBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)192 Status SampleQueue::AcquireBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
193 {
194     MEDIA_TRACE_DEBUG("SampleQueue::AcquireBuffer");
195     // return from rollbackBufferQueue_ first
196     if (!rollbackBufferQueue_.empty()) {
197         sampleBuffer = rollbackBufferQueue_.front();
198         rollbackBufferQueue_.pop_front();
199         MEDIA_LOG_DD(PUBLIC_LOG_S " AcquireBuffer from rollbackBufferQueue_", config_.queueName_.c_str());
200     } else {
201         FALSE_RETURN_V(sampleBufferQueueConsumer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
202         Status ret = sampleBufferQueueConsumer_->AcquireBuffer(sampleBuffer);
203         FALSE_RETURN_V_NOLOG(ret == Status::OK, ret);
204         MEDIA_LOG_DD(PUBLIC_LOG_S " bufferId: " PUBLIC_LOG_U64 ", pts: " PUBLIC_LOG_D64
205                                  " GetCacheDuration= " PUBLIC_LOG_U64 " GetFilledBufferSize= " PUBLIC_LOG_U32,
206             config_.queueName_.c_str(),
207             sampleBuffer->GetUniqueId(),
208             sampleBuffer->pts_,
209             GetCacheDuration(),
210             sampleBufferQueueConsumer_->GetFilledBufferSize());
211     }
212 
213     if (!config_.isSupportBitrateSwitch_) {
214         MEDIA_LOG_DD(PUBLIC_LOG_S " not SupportBitrateSwitch", config_.queueName_.c_str());
215         return Status::OK;
216     }
217 
218     if (IsKeyFrame(sampleBuffer)) {
219         MEDIA_LOG_I(
220             PUBLIC_LOG_S " erase Key Frame pts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->pts_);
221         std::lock_guard<std::mutex> ptsLock(ptsMutex_);
222         keyFramePtsSet_.erase(sampleBuffer->pts_);
223     }
224 
225     return Status::OK;
226 }
227 
AcquireCopyToDstBuffer(std::shared_ptr<AVBuffer> & dstBuffer)228 Status SampleQueue::AcquireCopyToDstBuffer(std::shared_ptr<AVBuffer>& dstBuffer)
229 {
230     MEDIA_TRACE_DEBUG("SampleQueue::AcquireCopyToDstBuffer");
231     MEDIA_LOG_DD(PUBLIC_LOG_S " AcquireCopyToDstBuffer in", config_.queueName_.c_str());
232     FALSE_RETURN_V(dstBuffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
233 
234     std::shared_ptr<AVBuffer> srcBuffer;
235     Status ret = AcquireBuffer(srcBuffer);
236     FALSE_RETURN_V(ret == Status::OK && srcBuffer != nullptr, ret);
237 
238     ret = CopyBuffer(srcBuffer, dstBuffer);
239     if (ret != Status::OK) {
240         MEDIA_LOG_W(PUBLIC_LOG_S " AcquireCopyToDstBuffer fail ret=" PUBLIC_LOG_D32, config_.queueName_.c_str(), ret);
241         RollbackBuffer(srcBuffer);
242         return ret;
243     }
244     UpdateLastOutSamplePts(dstBuffer->pts_);
245 
246     ret = ReleaseBuffer(srcBuffer);
247     MEDIA_LOG_DD(PUBLIC_LOG_S " AcquireCopyToDstBuffer out", config_.queueName_.c_str());
248     return ret;
249 }
250 
CopyBuffer(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)251 Status SampleQueue::CopyBuffer(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
252 {
253     // copy basic data
254     dstBuffer->pts_ = srcBuffer->pts_;
255     dstBuffer->dts_ = srcBuffer->dts_;
256     dstBuffer->duration_ = srcBuffer->duration_;
257     dstBuffer->flag_ = srcBuffer->flag_;
258 
259     CopyMeta(srcBuffer, dstBuffer);
260 
261     if (IsEosFrame(dstBuffer)) {
262         MEDIA_LOG_I(PUBLIC_LOG_S " receive  IsEosFrame", config_.queueName_.c_str());
263         return Status::OK;
264     }
265     return CopyAVMemory(srcBuffer, dstBuffer);
266 }
267 
268 
CopyMeta(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)269 void SampleQueue::CopyMeta(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
270 {
271     if (srcBuffer->meta_ == nullptr) {
272         dstBuffer->meta_ = nullptr;
273         return;
274     }
275 
276     int32_t trackId = INVALID_TRACK_ID;
277     if (!dstBuffer->meta_->GetData(Tag::REGULAR_TRACK_ID, trackId)) {
278         MEDIA_LOG_DD("trackId not found");
279     }
280 
281     dstBuffer->meta_ = std::make_shared<Meta>(*(srcBuffer->meta_));
282     if (dstBuffer->meta_ == nullptr) {
283         return;
284     }
285 
286     if (trackId != INVALID_TRACK_ID) {
287         dstBuffer->meta_->SetData(Tag::REGULAR_TRACK_ID, trackId);
288     }
289 }
290 
CopyAVMemory(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)291 Status SampleQueue::CopyAVMemory(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
292 {
293     std::shared_ptr<AVMemory>& srcMemory = srcBuffer->memory_;
294     std::shared_ptr<AVMemory>& dstMemory = dstBuffer->memory_;
295     if (!srcMemory || !dstMemory) {
296         return Status::ERROR_NULL_POINT_BUFFER;
297     }
298     if (srcMemory->GetSize() > dstMemory->GetCapacity()) {
299         MEDIA_LOG_E(PUBLIC_LOG_S " srcMemory->GetSize() " PUBLIC_LOG_U32 "dstMemory->GetCapacity()" PUBLIC_LOG_U32
300                                     " srcMemory->GetOffset()" PUBLIC_LOG_U32,
301             config_.queueName_.c_str(),
302             srcMemory->GetSize(),
303             dstMemory->GetCapacity(),
304             srcMemory->GetOffset());
305         return Status::ERROR_INVALID_BUFFER_SIZE;
306     }
307 
308     errno_t copyRet = memcpy_s(dstMemory->GetAddr(),
309         dstMemory->GetCapacity(),
310         srcMemory->GetAddr() + srcMemory->GetOffset(),
311         srcMemory->GetSize());
312     if (copyRet != EOK) {
313         return Status::ERROR_UNKNOWN;
314     }
315     dstMemory->SetSize(srcMemory->GetSize());
316     dstMemory->SetOffset(srcMemory->GetOffset());
317     return Status::OK;
318 }
319 
ReleaseBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)320 Status SampleQueue::ReleaseBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
321 {
322     MEDIA_LOG_DD(PUBLIC_LOG_S " ReleaseBuffer", config_.queueName_.c_str());
323     FALSE_RETURN_V(sampleBufferQueueConsumer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
324     Status status = sampleBufferQueueConsumer_->ReleaseBuffer(sampleBuffer);
325     FALSE_RETURN_V_MSG_E(
326         status == Status::OK, status, PUBLIC_LOG_S "ReleaseBuffer failed ", config_.queueName_.c_str());
327     MEDIA_LOG_DD(PUBLIC_LOG_S " bufferId: " PUBLIC_LOG_U64 ", pts: " PUBLIC_LOG_D64,
328         config_.queueName_.c_str(),
329         sampleBuffer->GetUniqueId(),
330         sampleBuffer->pts_);
331     return status;
332 }
333 
RollbackBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)334 Status SampleQueue::RollbackBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
335 {
336     MEDIA_LOG_DD(PUBLIC_LOG_S " RollbackBuffer", config_.queueName_.c_str());
337     rollbackBufferQueue_.push_back(sampleBuffer);
338     return Status::OK;
339 }
340 
QuerySizeForNextAcquireBuffer(size_t & size)341 Status SampleQueue::QuerySizeForNextAcquireBuffer(size_t& size)
342 {
343     std::shared_ptr<AVBuffer> sampleBuffer;
344     if (!rollbackBufferQueue_.empty()) {
345         sampleBuffer = rollbackBufferQueue_.front();
346     } else {
347         Status ret = AcquireBuffer(sampleBuffer);
348         FALSE_RETURN_V_MSG_D(
349             ret == Status::OK, ret, PUBLIC_LOG_S " failed ret=" PUBLIC_LOG_D32, config_.queueName_.c_str(), ret);
350         SampleQueue::RollbackBuffer(sampleBuffer);
351     }
352     FALSE_RETURN_V(sampleBuffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
353     size = sampleBuffer->GetConfig().capacity;
354     MEDIA_LOG_DD(PUBLIC_LOG_S " QuerySizeForNextAcquireBuffer size=" PUBLIC_LOG_ZU, config_.queueName_.c_str(), size);
355     return Status::OK;
356 }
357 
Clear()358 Status SampleQueue::Clear()
359 {
360     MEDIA_LOG_I(PUBLIC_LOG_S " SampleQueue Clear", config_.queueName_.c_str());
361     while (!rollbackBufferQueue_.empty()) {
362         auto sampleBuffer = rollbackBufferQueue_.front();
363         MEDIA_LOG_I(PUBLIC_LOG_S" clear rollbackBufferQueue_ bufferId: " PUBLIC_LOG_U64
364         ", pts: " PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->GetUniqueId(), sampleBuffer->pts_);
365         rollbackBufferQueue_.pop_front();
366         ReleaseBuffer(sampleBuffer);
367     }
368     if (sampleBufferQueueProducer_ != nullptr) {
369         sampleBufferQueueProducer_->Clear();
370     }
371     std::lock_guard<std::mutex> ptsLock(ptsMutex_);
372     keyFramePtsSet_.clear();
373     return Status::OK;
374 }
375 
DiscardSampleAfter(int64_t startPts)376 Status SampleQueue::DiscardSampleAfter(int64_t startPts)
377 {
378     MEDIA_LOG_I(PUBLIC_LOG_S "DiscardSampleAfter startPts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), startPts);
379     {
380         std::lock_guard<std::mutex> ptsLock(ptsMutex_);
381         MEDIA_LOG_I("before DiscardSampleAfter keyFramePtsSet_ =" PUBLIC_LOG_S, SetToString(keyFramePtsSet_).c_str());
382         auto it = keyFramePtsSet_.lower_bound(startPts);
383         keyFramePtsSet_.erase(it, keyFramePtsSet_.end());
384         lastEndSamplePts_ = startPts;
385     }
386     FALSE_RETURN_V(sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
387     auto isNewerSample = [startPts](const std::shared_ptr<AVBuffer>& buffer) {
388         return (buffer != nullptr) && (buffer->pts_ >= startPts);
389     };
390     return sampleBufferQueueProducer_->ClearBufferIf(isNewerSample);
391 }
392 
ReadySwitchBitrate(uint32_t bitrate)393 Status SampleQueue::ReadySwitchBitrate(uint32_t bitrate)
394 {
395     MediaAVCodec::AVCodecTrace trace("SampleQueue::ReadySwitchBitrate");
396     if (!config_.isSupportBitrateSwitch_) {
397         MEDIA_LOG_W("invalid operation for ReadySwitchBitrate=" PUBLIC_LOG_U32, bitrate);
398         return Status::ERROR_INVALID_OPERATION;
399     }
400     std::lock_guard<std::mutex> statusLock(statusMutex_);
401     if (switchStatus_ == SelectBitrateStatus::NORMAL) {
402         nextSwitchBitrate_ = bitrate;
403         switchStatus_ = SelectBitrateStatus::READY_SWITCH;
404         if (IsSwitchBitrateOK()) {
405             return NotifySwitchBitrateOK();
406         }
407     } else if (switchStatus_ == SelectBitrateStatus::READY_SWITCH) {
408         // replace the old bitrate before SWITCHING
409         MEDIA_LOG_W("replace new request bitrate from " PUBLIC_LOG_U32 " to"
410              PUBLIC_LOG_U32, nextSwitchBitrate_, bitrate);
411         nextSwitchBitrate_ = bitrate;
412     } else if (switchStatus_ == SelectBitrateStatus::SWITCHING) {
413         // incomming new bitrate just put switchBitrateWaitList_ when switching
414         std::lock_guard<std::mutex> lockList(waitListMutex_);
415         // drop the oldest bitrate in switchBitrateWaitList_
416         if (switchBitrateWaitList_.size() >= MAX_BITRATE_SWITCH_WAIT_NUMBER) {
417             uint32_t oldestBitrate = switchBitrateWaitList_.front();
418             switchBitrateWaitList_.pop_front();
419             MEDIA_LOG_I("switchBitrateWaitList_ remove oldestBitrate: " PUBLIC_LOG_U32, oldestBitrate);
420         }
421         MEDIA_LOG_I("switchBitrateWaitList_ add new bitrate: " PUBLIC_LOG_U32, bitrate);
422         switchBitrateWaitList_.push_back(bitrate);
423     }
424     return Status::OK;
425 }
426 
NotifySwitchBitrateOK()427 Status SampleQueue::NotifySwitchBitrateOK()
428 {
429     {
430         auto sampleQueueCb = sampleQueueCb_.lock();
431         FALSE_RETURN_V(sampleQueueCb != nullptr, Status::ERROR_NULL_POINT_BUFFER);
432         sampleQueueCb->OnSelectBitrateOk(startPtsToSwitch_, nextSwitchBitrate_);
433     }
434     switchStatus_ = SelectBitrateStatus::SWITCHING;
435     MEDIA_LOG_I("SelectBitrateStatus::SWITCHING for startPtsToSwitch_=" PUBLIC_LOG_D64 ",nextSwitchBitrate_="
436     PUBLIC_LOG_U32, startPtsToSwitch_, nextSwitchBitrate_);
437     return Status::OK;
438 }
439 
UpdateLastEndSamplePts(int64_t lastEndSamplePts)440 Status SampleQueue::UpdateLastEndSamplePts(int64_t lastEndSamplePts)
441 {
442     MEDIA_LOG_DD("UpdateLastEndSamplePts lastEndSamplePts=" PUBLIC_LOG_D64, lastEndSamplePts);
443     lastEndSamplePts_ = lastEndSamplePts;
444     return Status::OK;
445 }
446 
UpdateLastOutSamplePts(int64_t lastOutSamplePts)447 Status SampleQueue::UpdateLastOutSamplePts(int64_t lastOutSamplePts)
448 {
449     MEDIA_LOG_DD("UpdateLastOutSamplePts lastOutSamplePts=" PUBLIC_LOG_D64, lastOutSamplePts);
450     lastOutSamplePts_ = lastOutSamplePts;
451     return Status::OK;
452 }
453 
ResponseForSwitchDone(int64_t startPtsOnSwitch)454 Status SampleQueue::ResponseForSwitchDone(int64_t startPtsOnSwitch)
455 {
456     MEDIA_LOG_I(PUBLIC_LOG_S " ResponseForSwitchDone startPtsOnSwitch=" PUBLIC_LOG_D64,
457         config_.queueName_.c_str(),
458         startPtsOnSwitch);
459 
460     Status ret = DiscardSampleAfter(startPtsOnSwitch);
461     FALSE_RETURN_V_NOLOG(ret == Status::OK, ret);
462     {
463         std::lock_guard<std::mutex> statusLock(statusMutex_);
464         if (switchStatus_ == SelectBitrateStatus::SWITCHING) {
465             switchStatus_ = SelectBitrateStatus::NORMAL;
466         }
467         CheckSwitchBitrateWaitList();
468     }
469     return Status::OK;
470 }
471 
CheckSwitchBitrateWaitList()472 void SampleQueue::CheckSwitchBitrateWaitList()
473 {
474     std::lock_guard<std::mutex> lockList(waitListMutex_);
475     auto it = switchBitrateWaitList_.begin();
476     while (it != switchBitrateWaitList_.end()) {
477         if (*it != nextSwitchBitrate_) {
478             nextSwitchBitrate_ = *it;
479             switchStatus_ = SelectBitrateStatus::READY_SWITCH;
480             MEDIA_LOG_I("READY_SWITCH to nextSwitchBitrate_=" PUBLIC_LOG_U32, nextSwitchBitrate_);
481             switchBitrateWaitList_.erase(it);
482             break;
483         } else {
484             it = switchBitrateWaitList_.erase(it);
485         }
486     }
487 }
488 
IsSwitchBitrateOK()489 bool SampleQueue::IsSwitchBitrateOK()
490 {
491     if (switchStatus_ != SelectBitrateStatus::READY_SWITCH) {
492         return false;
493     }
494 
495     if (!IsKeyFrameAvailable()) {
496         return false;
497     }
498     int64_t cacheDiff = startPtsToSwitch_ - lastEndSamplePts_;
499     MEDIA_LOG_I("IsSwitchBitrateOK cacheDiff=" PUBLIC_LOG_D64 ", startPtsToSwitch_=" PUBLIC_LOG_D64
500                 ", lastEndSamplePts_=" PUBLIC_LOG_D64,
501         cacheDiff,
502         startPtsToSwitch_,
503         lastEndSamplePts_);
504     return true;
505 }
506 
507 
IsKeyFrameAvailable()508 bool SampleQueue::IsKeyFrameAvailable()
509 {
510     std::lock_guard<std::mutex> ptsLock(ptsMutex_);
511     MEDIA_LOG_I("keyFramePtsSet_ =" PUBLIC_LOG_S, SetToString(keyFramePtsSet_).c_str());
512     auto it = keyFramePtsSet_.lower_bound(lastEndSamplePts_ + MIN_SWITCH_BITRATE_TIME_US);
513     if (it != keyFramePtsSet_.end()) {
514         startPtsToSwitch_ = *it;
515         MEDIA_LOG_I("ok cache MIN_SWITCH_BITRATE_TIME_US with keyframe startpts=" PUBLIC_LOG_D64, startPtsToSwitch_);
516     } else if (!keyFramePtsSet_.empty()) {
517         startPtsToSwitch_ = *(keyFramePtsSet_.rbegin());
518         MEDIA_LOG_I("ok with last keyframe startpts=" PUBLIC_LOG_D64, startPtsToSwitch_);
519     } else {
520         return false;
521     }
522     return true;
523 }
524 
SetToString(std::set<int64_t> localSet)525 std::string SampleQueue::SetToString(std::set<int64_t> localSet)
526 {
527     std::stringstream ss;
528     for (auto it = localSet.begin(); it != localSet.end(); ++it) {
529         if (it != localSet.begin()) {
530             ss << " ";
531         }
532         ss << *it;
533     }
534     return ss.str();
535 }
536 
IsKeyFrame(std::shared_ptr<AVBuffer> & sampleBuffer) const537 bool SampleQueue::IsKeyFrame(std::shared_ptr<AVBuffer>& sampleBuffer) const
538 {
539     return (sampleBuffer != nullptr) &&
540            (sampleBuffer->flag_ & static_cast<uint32_t>(Plugins::AVBufferFlag::SYNC_FRAME));
541 }
542 
IsEosFrame(std::shared_ptr<AVBuffer> & sampleBuffer) const543 bool SampleQueue::IsEosFrame(std::shared_ptr<AVBuffer>& sampleBuffer) const
544 {
545     return (sampleBuffer != nullptr) && (sampleBuffer->flag_ & static_cast<uint32_t>(Plugins::AVBufferFlag::EOS));
546 }
547 
OnBufferAvailable()548 void SampleQueue::OnBufferAvailable()
549 {
550     MEDIA_LOG_DD(PUBLIC_LOG_S " OnBufferAvailable sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
551         config_.queueName_.c_str(),
552         sampleBufferQueueProducer_->GetQueueSize());
553     auto sampleQueueCb = sampleQueueCb_.lock();
554     if (sampleQueueCb != nullptr) {
555         MEDIA_LOG_D(PUBLIC_LOG_S " OnSampleQueueBufferAvailable ", config_.queueName_.c_str());
556         sampleQueueCb->OnSampleQueueBufferAvailable(config_.queueId_);
557     }
558 }
559 
OnBufferConsumer()560 void SampleQueue::OnBufferConsumer()
561 {
562     MEDIA_LOG_DD(PUBLIC_LOG_S " OnBufferConsumer ", config_.queueName_.c_str());
563     auto sampleQueueCb = sampleQueueCb_.lock();
564     if (sampleQueueCb != nullptr) {
565         MEDIA_LOG_DD(PUBLIC_LOG_S " OnSampleQueueBufferConsume ", config_.queueName_.c_str());
566         sampleQueueCb->OnSampleQueueBufferConsume(config_.queueId_);
567     }
568 }
569 
UpdateQueueId(int32_t queueId)570 void SampleQueue::UpdateQueueId(int32_t queueId)
571 {
572     MEDIA_LOG_I(PUBLIC_LOG_S " change queueId to " PUBLIC_LOG_D32, config_.queueName_.c_str(), queueId);
573     config_.queueId_ = queueId;
574 }
575 
GetCacheDuration() const576 uint64_t SampleQueue::GetCacheDuration() const
577 {
578     if (lastEnterSamplePts_ == Plugins::HST_TIME_NONE || lastOutSamplePts_ == Plugins::HST_TIME_NONE) {
579         return 0;
580     }
581     int64_t diff = lastEnterSamplePts_ - lastOutSamplePts_;
582     MEDIA_LOG_DD(PUBLIC_LOG_S " lastEnterSamplePts_=" PUBLIC_LOG_D64 " lastEndSamplePts_=" PUBLIC_LOG_D64
583         " diff=" PUBLIC_LOG_D64, config_.queueName_.c_str(), lastEnterSamplePts_, lastOutSamplePts_, diff);
584     return (diff > 0) ? static_cast<uint64_t>(diff) : 0;
585 }
586 
GetMemoryUsage()587 uint32_t SampleQueue::GetMemoryUsage()
588 {
589     FALSE_RETURN_V_MSG_E(sampleBufferQueue_ != nullptr, 0, "bufferQueue nullptr");
590     return sampleBufferQueue_->GetMemoryUsage();
591 }
592 
StringifyMeta(std::shared_ptr<Meta> & meta)593 std::string SampleQueue::StringifyMeta(std::shared_ptr<Meta> &meta)
594 {
595     FALSE_RETURN_V(meta != nullptr, "");
596     std::stringstream dumpStream;
597     for (auto iter = meta->begin(); iter != meta->end(); ++iter) {
598         switch (meta->GetValueType(iter->first)) {
599             case AnyValueType::INT32_T:
600                 dumpStream << iter->first << " = " << std::to_string(AnyCast<int32_t>(iter->second)) << " | ";
601                 break;
602             case AnyValueType::UINT32_T:
603                 dumpStream << iter->first << " = " << std::to_string(AnyCast<uint32_t>(iter->second)) << " | ";
604                 break;
605             case AnyValueType::BOOL:
606                 dumpStream << iter->first << " = " << std::to_string(AnyCast<bool>(iter->second)) << " | ";
607                 break;
608             case AnyValueType::DOUBLE:
609                 dumpStream << iter->first << " = " << std::to_string(AnyCast<double>(iter->second)) << " | ";
610                 break;
611             case AnyValueType::INT64_T:
612                 dumpStream << iter->first << " = " << std::to_string(AnyCast<int64_t>(iter->second)) << " | ";
613                 break;
614             case AnyValueType::FLOAT:
615                 dumpStream << iter->first << " = " << std::to_string(AnyCast<float>(iter->second)) << " | ";
616                 break;
617             case AnyValueType::STRING:
618                 dumpStream << iter->first << " = " << AnyCast<std::string>(iter->second) << " | ";
619                 break;
620             default:
621                 dumpStream << iter->first << " = " << "unknown type | ";
622                 break;
623         }
624     }
625     return dumpStream.str();
626 }
627 
IsEmpty()628 bool SampleQueue::IsEmpty()
629 {
630     return sampleBufferQueueConsumer_->GetFilledBufferSize() == 0;
631 }
632 } // namespace Media
633 } // namespace OHOS
634