• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define HST_LOG_TAG "SampleQueue"
17 
18 #include <sstream>
19 #include <securec.h>
20 #include "common/log.h"
21 #include "sample_queue.h"
22 #include "avcodec_trace.h"
23 
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_PLAYER, "SampleQueue" };
26 constexpr uint32_t INVALID_TRACK_ID = -1;
27 }
28 
29 namespace OHOS {
30 namespace Media {
31 
32 class SampleBufferConsumerListener : public IConsumerListener {
33 public:
SampleBufferConsumerListener(std::shared_ptr<SampleQueue> sampleQueue)34     explicit SampleBufferConsumerListener(std::shared_ptr<SampleQueue> sampleQueue)
35         : sampleQueue_(std::move(sampleQueue))
36     {}
37     virtual ~SampleBufferConsumerListener() = default;
38 
OnBufferAvailable()39     void OnBufferAvailable() override
40     {
41         if (auto sampleQueue = sampleQueue_.lock()) {
42             sampleQueue->OnBufferConsumer();
43         } else {
44             MEDIA_LOG_E("Invalid sampleQueue instance.");
45         }
46     }
47 
48 private:
49     std::weak_ptr<SampleQueue> sampleQueue_;
50 };
51 
Init(const Config & config)52 Status SampleQueue::Init(const Config& config)
53 {
54     config_ = config;
55     config_.queueSize_ = std::min(config.queueSize_, MAX_SAMPLE_QUEUE_SIZE);
56     config_.bufferCap_ = std::min(config.bufferCap_, MAX_SAMPLE_BUFFER_CAP);
57     config_.queueName_ = "SampleQueue_" + std::to_string(config_.queueId_);
58     sampleBufferQueue_ = AVBufferQueue::Create(config_.queueSize_, MemoryType::VIRTUAL_MEMORY, config_.queueName_);
59     FALSE_RETURN_V_MSG_E(sampleBufferQueue_ != nullptr, Status::ERROR_NO_MEMORY, "AVBufferQueue::Create failed");
60     if (config_.isFlvLiveStream_) {
61         config_.queueSize_ = MAX_SAMPLE_QUEUE_SIZE;
62         sampleBufferQueue_->SetLargerQueueSize(config_.queueSize_);
63     }
64 
65     sampleBufferQueueProducer_ = sampleBufferQueue_->GetProducer();
66 
67     sampleBufferQueueConsumer_ = sampleBufferQueue_->GetConsumer();
68     sptr<IConsumerListener> consumerListener = new(std::nothrow) SampleBufferConsumerListener(shared_from_this());
69     FALSE_RETURN_V_MSG_E(consumerListener != nullptr, Status::ERROR_NO_MEMORY, "SampleBufferConsumerListener nullptr");
70     sampleBufferQueueConsumer_->SetBufferAvailableListener(consumerListener);
71 
72     MEDIA_LOG_I(PUBLIC_LOG_S " AVBufferQueue::Create queueSize_" PUBLIC_LOG_U32,
73         config_.queueName_.c_str(),
74         config_.queueSize_);
75     return AttachBuffer();
76 }
77 
AttachBuffer()78 Status SampleQueue::AttachBuffer()
79 {
80     for (uint32_t i = 0; i < config_.queueSize_; i++) {
81         auto avAllocator = AVAllocatorFactory::CreateVirtualAllocator();
82         std::shared_ptr<AVBuffer> buffer = AVBuffer::CreateAVBuffer(avAllocator, config_.bufferCap_);
83         FALSE_RETURN_V_MSG_E(buffer != nullptr, Status::ERROR_NO_MEMORY, "CreateAVBuffer failed");
84         Status status = sampleBufferQueueProducer_->AttachBuffer(buffer, false);
85         FALSE_RETURN_V_MSG_E(
86             status == Status::OK, status, "AttachBuffer failed status=" PUBLIC_LOG_D32, (int32_t)status);
87     }
88     return Status::OK;
89 }
90 
SetSampleQueueCallback(std::shared_ptr<SampleQueueCallback> sampleQueueCb)91 Status SampleQueue::SetSampleQueueCallback(std::shared_ptr<SampleQueueCallback> sampleQueueCb)
92 {
93     sampleQueueCb_ = sampleQueueCb;
94     return Status::OK;
95 }
96 
GetBufferQueueProducer() const97 sptr<AVBufferQueueProducer> SampleQueue::GetBufferQueueProducer() const
98 {
99     return sampleBufferQueueProducer_;
100 }
101 
RequestBuffer(std::shared_ptr<AVBuffer> & sampleBuffer,const AVBufferConfig & config,int32_t timeoutMs)102 Status SampleQueue::RequestBuffer(
103     std::shared_ptr<AVBuffer> &sampleBuffer, const AVBufferConfig &config, int32_t timeoutMs)
104 {
105     MediaAVCodec::AVCodecTrace trace("SampleQueue::RequestBuffer");
106     FALSE_RETURN_V(sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
107     MEDIA_LOG_D(PUBLIC_LOG_S " sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
108         config_.queueName_.c_str(),
109         sampleBufferQueueProducer_->GetQueueSize());
110     return sampleBufferQueueProducer_->RequestBuffer(sampleBuffer, config, timeoutMs);
111 }
112 
PushBuffer(std::shared_ptr<AVBuffer> & sampleBuffer,bool available)113 Status SampleQueue::PushBuffer(std::shared_ptr<AVBuffer>& sampleBuffer, bool available)
114 {
115     MediaAVCodec::AVCodecTrace trace("SampleQueue::PushBuffer");
116     FALSE_RETURN_V(sampleBuffer != nullptr && sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
117     MEDIA_LOG_D(PUBLIC_LOG_S " sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
118         config_.queueName_.c_str(),
119         sampleBufferQueueProducer_->GetQueueSize());
120     Status status = sampleBufferQueueProducer_->PushBuffer(sampleBuffer, available);
121     FALSE_RETURN_V(available && status == Status::OK, status);
122 
123     lastEnterSamplePts_ = sampleBuffer->pts_;
124     MEDIA_LOG_D(PUBLIC_LOG_S " PushBuffer pts=" PUBLIC_LOG_D64 " dts=" PUBLIC_LOG_D64 " duration=" PUBLIC_LOG_D64,
125         config_.queueName_.c_str(), sampleBuffer->pts_, sampleBuffer->dts_, sampleBuffer->duration_);
126     if (!config_.isSupportBitrateSwitch_) {
127         return Status::OK;
128     }
129 
130     if (!IsKeyFrame(sampleBuffer)) {
131         return Status::OK;
132     }
133     MEDIA_LOG_I(PUBLIC_LOG_S " insert Key Frame pts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->pts_);
134     {
135         std::lock_guard<std::mutex> ptsLock(ptsMutex_);
136         keyFramePtsSet_.insert(sampleBuffer->pts_);
137     }
138 
139     {
140         std::lock_guard<std::mutex> statusLock(statusMutex_);
141         if (IsSwitchBitrateOK()) {
142             NotifySwitchBitrateOK();
143         }
144     }
145     return Status::OK;
146 }
147 
AcquireBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)148 Status SampleQueue::AcquireBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
149 {
150     MediaAVCodec::AVCodecTrace trace("SampleQueue::AcquireBuffer");
151     // return from rollbackBufferQueue_ first
152     if (!rollbackBufferQueue_.empty()) {
153         sampleBuffer = rollbackBufferQueue_.front();
154         rollbackBufferQueue_.pop_front();
155         MEDIA_LOG_I(PUBLIC_LOG_S " AcquireBuffer from rollbackBufferQueue_", config_.queueName_.c_str());
156     } else {
157         FALSE_RETURN_V(sampleBufferQueueConsumer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
158         Status ret = sampleBufferQueueConsumer_->AcquireBuffer(sampleBuffer);
159         FALSE_RETURN_V(ret == Status::OK, ret);
160         MEDIA_LOG_D(PUBLIC_LOG_S" bufferId: " PUBLIC_LOG_U64
161         ", pts: " PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->GetUniqueId(), sampleBuffer->pts_);
162     }
163 
164     if (!config_.isSupportBitrateSwitch_) {
165         MEDIA_LOG_D(PUBLIC_LOG_S " not SupportBitrateSwitch", config_.queueName_.c_str());
166         return Status::OK;
167     }
168 
169     if (IsKeyFrame(sampleBuffer)) {
170         MEDIA_LOG_I(
171             PUBLIC_LOG_S " erase Key Frame pts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->pts_);
172         std::lock_guard<std::mutex> ptsLock(ptsMutex_);
173         keyFramePtsSet_.erase(sampleBuffer->pts_);
174     }
175 
176     return Status::OK;
177 }
178 
AcquireCopyToDstBuffer(std::shared_ptr<AVBuffer> & dstBuffer)179 Status SampleQueue::AcquireCopyToDstBuffer(std::shared_ptr<AVBuffer>& dstBuffer)
180 {
181     MediaAVCodec::AVCodecTrace trace("SampleQueue::AcquireCopyToDstBuffer");
182     MEDIA_LOG_D(PUBLIC_LOG_S " AcquireCopyToDstBuffer in", config_.queueName_.c_str());
183     FALSE_RETURN_V(dstBuffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
184 
185     std::shared_ptr<AVBuffer> srcBuffer;
186     Status ret = AcquireBuffer(srcBuffer);
187     FALSE_RETURN_V(ret == Status::OK && srcBuffer != nullptr, ret);
188 
189     ret = CopyBuffer(srcBuffer, dstBuffer);
190     if (ret != Status::OK) {
191         MEDIA_LOG_W(PUBLIC_LOG_S " AcquireCopyToDstBuffer fail ret=" PUBLIC_LOG_D32, config_.queueName_.c_str(), ret);
192         RollbackBuffer(srcBuffer);
193         return ret;
194     }
195     UpdateLastOutSamplePts(dstBuffer->pts_);
196 
197     ret = ReleaseBuffer(srcBuffer);
198     if (ret == Status::OK) {
199         OnBufferAvailable();
200     }
201     MEDIA_LOG_D(PUBLIC_LOG_S " AcquireCopyToDstBuffer out", config_.queueName_.c_str());
202     return ret;
203 }
204 
CopyBuffer(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)205 Status SampleQueue::CopyBuffer(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
206 {
207     // copy basic data
208     dstBuffer->pts_ = srcBuffer->pts_;
209     dstBuffer->dts_ = srcBuffer->dts_;
210     dstBuffer->duration_ = srcBuffer->duration_;
211     dstBuffer->flag_ = srcBuffer->flag_;
212 
213     CopyMeta(srcBuffer, dstBuffer);
214 
215     if (IsEosFrame(dstBuffer)) {
216         MEDIA_LOG_I(PUBLIC_LOG_S " receive  IsEosFrame", config_.queueName_.c_str());
217         return Status::OK;
218     }
219     return CopyAVMemory(srcBuffer, dstBuffer);
220 }
221 
222 
CopyMeta(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)223 void SampleQueue::CopyMeta(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
224 {
225     if (srcBuffer->meta_ == nullptr) {
226         dstBuffer->meta_ = nullptr;
227         return;
228     }
229 
230     uint32_t trackId = INVALID_TRACK_ID;
231     if (!dstBuffer->meta_->GetData(Tag::REGULAR_TRACK_ID, trackId)) {
232         MEDIA_LOG_D("trackId not found");
233     }
234 
235     dstBuffer->meta_ = std::make_shared<Meta>(*(srcBuffer->meta_));
236     if (dstBuffer->meta_ == nullptr) {
237         return;
238     }
239 
240     if (trackId != INVALID_TRACK_ID) {
241         dstBuffer->meta_->SetData(Tag::REGULAR_TRACK_ID, trackId);
242     }
243 }
244 
CopyAVMemory(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)245 Status SampleQueue::CopyAVMemory(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
246 {
247     std::shared_ptr<AVMemory>& srcMemory = srcBuffer->memory_;
248     std::shared_ptr<AVMemory>& dstMemory = dstBuffer->memory_;
249     if (!srcMemory || !dstMemory) {
250         return Status::ERROR_NULL_POINT_BUFFER;
251     }
252     if (srcMemory->GetSize() > dstMemory->GetCapacity()) {
253         MEDIA_LOG_E(PUBLIC_LOG_S " srcMemory->GetSize() " PUBLIC_LOG_U32 "dstMemory->GetCapacity()" PUBLIC_LOG_U32
254                                     " srcMemory->GetOffset()" PUBLIC_LOG_U32,
255             config_.queueName_.c_str(),
256             srcMemory->GetSize(),
257             dstMemory->GetCapacity(),
258             srcMemory->GetOffset());
259         return Status::ERROR_INVALID_BUFFER_SIZE;
260     }
261 
262     errno_t copyRet = memcpy_s(dstMemory->GetAddr(),
263         dstMemory->GetCapacity(),
264         srcMemory->GetAddr() + srcMemory->GetOffset(),
265         srcMemory->GetSize());
266     if (copyRet != EOK) {
267         return Status::ERROR_UNKNOWN;
268     }
269     dstMemory->SetSize(srcMemory->GetSize());
270     dstMemory->SetOffset(srcMemory->GetOffset());
271     return Status::OK;
272 }
273 
ReleaseBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)274 Status SampleQueue::ReleaseBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
275 {
276     MEDIA_LOG_D(PUBLIC_LOG_S " ReleaseBuffer", config_.queueName_.c_str());
277     FALSE_RETURN_V(sampleBufferQueueConsumer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
278     Status status = sampleBufferQueueConsumer_->ReleaseBuffer(sampleBuffer);
279     FALSE_RETURN_V_MSG_E(
280         status == Status::OK, status, PUBLIC_LOG_S "ReleaseBuffer failed ", config_.queueName_.c_str());
281     MEDIA_LOG_D(PUBLIC_LOG_S " bufferId: " PUBLIC_LOG_U64 ", pts: " PUBLIC_LOG_D64,
282         config_.queueName_.c_str(),
283         sampleBuffer->GetUniqueId(),
284         sampleBuffer->pts_);
285     return status;
286 }
287 
RollbackBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)288 Status SampleQueue::RollbackBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
289 {
290     MEDIA_LOG_D(PUBLIC_LOG_S " RollbackBuffer", config_.queueName_.c_str());
291     rollbackBufferQueue_.push_back(sampleBuffer);
292     return Status::OK;
293 }
294 
QuerySizeForNextAcquireBuffer(size_t & size)295 Status SampleQueue::QuerySizeForNextAcquireBuffer(size_t& size)
296 {
297     std::shared_ptr<AVBuffer> sampleBuffer;
298     if (!rollbackBufferQueue_.empty()) {
299         sampleBuffer = rollbackBufferQueue_.front();
300     } else {
301         Status ret = AcquireBuffer(sampleBuffer);
302         FALSE_RETURN_V_MSG_D(
303             ret == Status::OK, ret, PUBLIC_LOG_S " failed ret=" PUBLIC_LOG_D32, config_.queueName_.c_str(), ret);
304         SampleQueue::RollbackBuffer(sampleBuffer);
305     }
306     FALSE_RETURN_V(sampleBuffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
307     size = sampleBuffer->GetConfig().capacity;
308     MEDIA_LOG_D(PUBLIC_LOG_S " QuerySizeForNextAcquireBuffer size=" PUBLIC_LOG_ZU, config_.queueName_.c_str(), size);
309     return Status::OK;
310 }
311 
Clear()312 Status SampleQueue::Clear()
313 {
314     MEDIA_LOG_I(PUBLIC_LOG_S " SampleQueue Clear", config_.queueName_.c_str());
315     while (!rollbackBufferQueue_.empty()) {
316         auto sampleBuffer = rollbackBufferQueue_.front();
317         MEDIA_LOG_I(PUBLIC_LOG_S" clear rollbackBufferQueue_ bufferId: " PUBLIC_LOG_U64
318         ", pts: " PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->GetUniqueId(), sampleBuffer->pts_);
319         rollbackBufferQueue_.pop_front();
320         ReleaseBuffer(sampleBuffer);
321     }
322     sampleBufferQueueProducer_->Clear();
323 
324     std::lock_guard<std::mutex> ptsLock(ptsMutex_);
325     keyFramePtsSet_.clear();
326     return Status::OK;
327 }
328 
DiscardSampleAfter(int64_t startPts)329 Status SampleQueue::DiscardSampleAfter(int64_t startPts)
330 {
331     MEDIA_LOG_I(PUBLIC_LOG_S "DiscardSampleAfter startPts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), startPts);
332     {
333         std::lock_guard<std::mutex> ptsLock(ptsMutex_);
334         MEDIA_LOG_I("before DiscardSampleAfter keyFramePtsSet_ =" PUBLIC_LOG_S, SetToString(keyFramePtsSet_).c_str());
335         auto it = keyFramePtsSet_.lower_bound(startPts);
336         keyFramePtsSet_.erase(it, keyFramePtsSet_.end());
337         lastEndSamplePts_ = startPts;
338     }
339     FALSE_RETURN_V(sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
340     auto isNewerSample = [startPts](const std::shared_ptr<AVBuffer>& buffer) {
341         return (buffer != nullptr) && (buffer->pts_ >= startPts);
342     };
343     return sampleBufferQueueProducer_->ClearBufferIf(isNewerSample);
344 }
345 
ReadySwitchBitrate(uint32_t bitrate)346 Status SampleQueue::ReadySwitchBitrate(uint32_t bitrate)
347 {
348     MediaAVCodec::AVCodecTrace trace("SampleQueue::ReadySwitchBitrate");
349     if (!config_.isSupportBitrateSwitch_) {
350         MEDIA_LOG_W("invalid operation for ReadySwitchBitrate=" PUBLIC_LOG_U32, bitrate);
351         return Status::ERROR_INVALID_OPERATION;
352     }
353     std::lock_guard<std::mutex> statusLock(statusMutex_);
354     if (switchStatus_ == SelectBitrateStatus::NORMAL) {
355         nextSwitchBitrate_ = bitrate;
356         switchStatus_ = SelectBitrateStatus::READY_SWITCH;
357         if (IsSwitchBitrateOK()) {
358             return NotifySwitchBitrateOK();
359         }
360     } else if (switchStatus_ == SelectBitrateStatus::READY_SWITCH) {
361         // replace the old bitrate before SWITCHING
362         MEDIA_LOG_W("replace new request bitrate from " PUBLIC_LOG_U32 " to"
363              PUBLIC_LOG_U32, nextSwitchBitrate_, bitrate);
364         nextSwitchBitrate_ = bitrate;
365     } else if (switchStatus_ == SelectBitrateStatus::SWITCHING) {
366         // incomming new bitrate just put switchBitrateWaitList_ when switching
367         std::lock_guard<std::mutex> lockList(waitListMutex_);
368         // drop the oldest bitrate in switchBitrateWaitList_
369         if (switchBitrateWaitList_.size() >= MAX_BITRATE_SWITCH_WAIT_NUMBER) {
370             uint32_t oldestBitrate = switchBitrateWaitList_.front();
371             switchBitrateWaitList_.pop_front();
372             MEDIA_LOG_I("switchBitrateWaitList_ remove oldestBitrate: " PUBLIC_LOG_U32, oldestBitrate);
373         }
374         MEDIA_LOG_I("switchBitrateWaitList_ add new bitrate: " PUBLIC_LOG_U32, bitrate);
375         switchBitrateWaitList_.push_back(bitrate);
376     }
377     return Status::OK;
378 }
379 
NotifySwitchBitrateOK()380 Status SampleQueue::NotifySwitchBitrateOK()
381 {
382     {
383         auto sampleQueueCb = sampleQueueCb_.lock();
384         FALSE_RETURN_V(sampleQueueCb != nullptr, Status::ERROR_NULL_POINT_BUFFER);
385         sampleQueueCb->OnSelectBitrateOk(startPtsToSwitch_, nextSwitchBitrate_);
386     }
387     switchStatus_ = SelectBitrateStatus::SWITCHING;
388     MEDIA_LOG_I("SelectBitrateStatus::SWITCHING for startPtsToSwitch_=" PUBLIC_LOG_D64 ",nextSwitchBitrate_="
389     PUBLIC_LOG_U32, startPtsToSwitch_, nextSwitchBitrate_);
390     return Status::OK;
391 }
392 
UpdateLastEndSamplePts(int64_t lastEndSamplePts)393 Status SampleQueue::UpdateLastEndSamplePts(int64_t lastEndSamplePts)
394 {
395     MEDIA_LOG_D("UpdateLastEndSamplePts lastEndSamplePts=" PUBLIC_LOG_D64, lastEndSamplePts);
396     lastEndSamplePts_ = lastEndSamplePts;
397     return Status::OK;
398 }
399 
UpdateLastOutSamplePts(int64_t lastOutSamplePts)400 Status SampleQueue::UpdateLastOutSamplePts(int64_t lastOutSamplePts)
401 {
402     MEDIA_LOG_D("UpdateLastOutSamplePts lastOutSamplePts=" PUBLIC_LOG_D64, lastOutSamplePts);
403     lastOutSamplePts_ = lastOutSamplePts;
404     return Status::OK;
405 }
406 
ResponseForSwitchDone(int64_t startPtsOnSwitch)407 Status SampleQueue::ResponseForSwitchDone(int64_t startPtsOnSwitch)
408 {
409     MEDIA_LOG_I(PUBLIC_LOG_S " ResponseForSwitchDone startPtsOnSwitch=" PUBLIC_LOG_D64,
410         config_.queueName_.c_str(),
411         startPtsOnSwitch);
412 
413     Status ret = DiscardSampleAfter(startPtsOnSwitch);
414     FALSE_RETURN_V_NOLOG(ret == Status::OK, ret);
415     {
416         std::lock_guard<std::mutex> statusLock(statusMutex_);
417         if (switchStatus_ == SelectBitrateStatus::SWITCHING) {
418             switchStatus_ = SelectBitrateStatus::NORMAL;
419         }
420         CheckSwitchBitrateWaitList();
421     }
422     return Status::OK;
423 }
424 
CheckSwitchBitrateWaitList()425 void SampleQueue::CheckSwitchBitrateWaitList()
426 {
427     std::lock_guard<std::mutex> lockList(waitListMutex_);
428     auto it = switchBitrateWaitList_.begin();
429     while (it != switchBitrateWaitList_.end()) {
430         if (*it != nextSwitchBitrate_) {
431             nextSwitchBitrate_ = *it;
432             switchStatus_ = SelectBitrateStatus::READY_SWITCH;
433             MEDIA_LOG_I("READY_SWITCH to nextSwitchBitrate_=" PUBLIC_LOG_U32, nextSwitchBitrate_);
434             switchBitrateWaitList_.erase(it);
435             break;
436         } else {
437             it = switchBitrateWaitList_.erase(it);
438         }
439     }
440 }
441 
IsSwitchBitrateOK()442 bool SampleQueue::IsSwitchBitrateOK()
443 {
444     if (switchStatus_ != SelectBitrateStatus::READY_SWITCH) {
445         return false;
446     }
447 
448     if (!IsKeyFrameAvailable()) {
449         return false;
450     }
451     int64_t cacheDiff = startPtsToSwitch_ - lastEndSamplePts_;
452     MEDIA_LOG_I("IsSwitchBitrateOK cacheDiff=" PUBLIC_LOG_D64 ", startPtsToSwitch_=" PUBLIC_LOG_D64
453                 ", lastEndSamplePts_=" PUBLIC_LOG_D64,
454         cacheDiff,
455         startPtsToSwitch_,
456         lastEndSamplePts_);
457     return true;
458 }
459 
460 
IsKeyFrameAvailable()461 bool SampleQueue::IsKeyFrameAvailable()
462 {
463     std::lock_guard<std::mutex> ptsLock(ptsMutex_);
464     MEDIA_LOG_I("keyFramePtsSet_ =" PUBLIC_LOG_S, SetToString(keyFramePtsSet_).c_str());
465     auto it = keyFramePtsSet_.lower_bound(lastEndSamplePts_ + MIN_SWITCH_BITRATE_TIME_US);
466     if (it != keyFramePtsSet_.end()) {
467         startPtsToSwitch_ = *it;
468         MEDIA_LOG_I("ok cache MIN_SWITCH_BITRATE_TIME_US with keyframe startpts=" PUBLIC_LOG_D64, startPtsToSwitch_);
469     } else if (!keyFramePtsSet_.empty()) {
470         startPtsToSwitch_ = *(keyFramePtsSet_.rbegin());
471         MEDIA_LOG_I("ok with last keyframe startpts=" PUBLIC_LOG_D64, startPtsToSwitch_);
472     } else {
473         return false;
474     }
475     return true;
476 }
477 
SetToString(std::set<int64_t> localSet)478 std::string SampleQueue::SetToString(std::set<int64_t> localSet)
479 {
480     std::stringstream ss;
481     for (auto it = localSet.begin(); it != localSet.end(); ++it) {
482         if (it != localSet.begin()) {
483             ss << " ";
484         }
485         ss << *it;
486     }
487     return ss.str();
488 }
489 
IsKeyFrame(std::shared_ptr<AVBuffer> & sampleBuffer) const490 bool SampleQueue::IsKeyFrame(std::shared_ptr<AVBuffer>& sampleBuffer) const
491 {
492     return (sampleBuffer != nullptr) &&
493            (sampleBuffer->flag_ & static_cast<uint32_t>(Plugins::AVBufferFlag::SYNC_FRAME));
494 }
495 
IsEosFrame(std::shared_ptr<AVBuffer> & sampleBuffer) const496 bool SampleQueue::IsEosFrame(std::shared_ptr<AVBuffer>& sampleBuffer) const
497 {
498     return (sampleBuffer != nullptr) && (sampleBuffer->flag_ & static_cast<uint32_t>(Plugins::AVBufferFlag::EOS));
499 }
500 
OnBufferAvailable()501 void SampleQueue::OnBufferAvailable()
502 {
503     MEDIA_LOG_D(PUBLIC_LOG_S " OnBufferAvailable sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
504         config_.queueName_.c_str(),
505         sampleBufferQueueProducer_->GetQueueSize());
506     auto sampleQueueCb = sampleQueueCb_.lock();
507     if (sampleQueueCb != nullptr) {
508         MEDIA_LOG_D(PUBLIC_LOG_S " OnSampleQueueBufferAvailable ", config_.queueName_.c_str());
509         sampleQueueCb->OnSampleQueueBufferAvailable(config_.queueId_);
510     }
511 }
512 
OnBufferConsumer()513 void SampleQueue::OnBufferConsumer()
514 {
515     MEDIA_LOG_D(PUBLIC_LOG_S " OnBufferConsumer ", config_.queueName_.c_str());
516     auto sampleQueueCb = sampleQueueCb_.lock();
517     if (sampleQueueCb != nullptr) {
518         MEDIA_LOG_D(PUBLIC_LOG_S " OnSampleQueueBufferConsume ", config_.queueName_.c_str());
519         sampleQueueCb->OnSampleQueueBufferConsume(config_.queueId_);
520     }
521 }
522 
UpdateQueueId(uint32_t queueId)523 void SampleQueue::UpdateQueueId(uint32_t queueId)
524 {
525     MEDIA_LOG_I(PUBLIC_LOG_S " change queueId to " PUBLIC_LOG_U32, config_.queueName_.c_str(), queueId);
526     config_.queueId_ = queueId;
527 }
528 
GetCacheDuration() const529 uint64_t SampleQueue::GetCacheDuration() const
530 {
531     if (lastEnterSamplePts_ == Plugins::HST_TIME_NONE || lastOutSamplePts_ == Plugins::HST_TIME_NONE) {
532         return 0;
533     }
534     int64_t diff = lastEnterSamplePts_ - lastOutSamplePts_;
535     MEDIA_LOG_D(PUBLIC_LOG_S " lastEnterSamplePts_=" PUBLIC_LOG_D64 " lastEndSamplePts_=" PUBLIC_LOG_D64
536         " diff=" PUBLIC_LOG_D64, config_.queueName_.c_str(), lastEnterSamplePts_, lastOutSamplePts_, diff);
537     return (diff > 0) ? static_cast<uint64_t>(diff) : 0;
538 }
539 
StringifyMeta(std::shared_ptr<Meta> & meta)540 std::string SampleQueue::StringifyMeta(std::shared_ptr<Meta> &meta)
541 {
542     FALSE_RETURN_V(meta != nullptr, "");
543     std::stringstream dumpStream;
544     for (auto iter = meta->begin(); iter != meta->end(); ++iter) {
545         switch (meta->GetValueType(iter->first)) {
546             case AnyValueType::INT32_T:
547                 dumpStream << iter->first << " = " << std::to_string(AnyCast<int32_t>(iter->second)) << " | ";
548                 break;
549             case AnyValueType::UINT32_T:
550                 dumpStream << iter->first << " = " << std::to_string(AnyCast<uint32_t>(iter->second)) << " | ";
551                 break;
552             case AnyValueType::BOOL:
553                 dumpStream << iter->first << " = " << std::to_string(AnyCast<bool>(iter->second)) << " | ";
554                 break;
555             case AnyValueType::DOUBLE:
556                 dumpStream << iter->first << " = " << std::to_string(AnyCast<double>(iter->second)) << " | ";
557                 break;
558             case AnyValueType::INT64_T:
559                 dumpStream << iter->first << " = " << std::to_string(AnyCast<int64_t>(iter->second)) << " | ";
560                 break;
561             case AnyValueType::FLOAT:
562                 dumpStream << iter->first << " = " << std::to_string(AnyCast<float>(iter->second)) << " | ";
563                 break;
564             case AnyValueType::STRING:
565                 dumpStream << iter->first << " = " << AnyCast<std::string>(iter->second) << " | ";
566                 break;
567             default:
568                 dumpStream << iter->first << " = " << "unknown type | ";
569                 break;
570         }
571     }
572     return dumpStream.str();
573 }
574 } // namespace Media
575 } // namespace OHOS
576