1 /*
2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "SampleQueue"
17
18 #include <sstream>
19 #include <securec.h>
20 #include "common/log.h"
21 #include "sample_queue.h"
22 #include "avcodec_trace.h"
23
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_PLAYER, "SampleQueue" };
26 constexpr uint32_t INVALID_TRACK_ID = -1;
27 }
28
29 namespace OHOS {
30 namespace Media {
31
32 class SampleBufferConsumerListener : public IConsumerListener {
33 public:
SampleBufferConsumerListener(std::shared_ptr<SampleQueue> sampleQueue)34 explicit SampleBufferConsumerListener(std::shared_ptr<SampleQueue> sampleQueue)
35 : sampleQueue_(std::move(sampleQueue))
36 {}
37 virtual ~SampleBufferConsumerListener() = default;
38
OnBufferAvailable()39 void OnBufferAvailable() override
40 {
41 if (auto sampleQueue = sampleQueue_.lock()) {
42 sampleQueue->OnBufferConsumer();
43 } else {
44 MEDIA_LOG_E("Invalid sampleQueue instance.");
45 }
46 }
47
48 private:
49 std::weak_ptr<SampleQueue> sampleQueue_;
50 };
51
Init(const Config & config)52 Status SampleQueue::Init(const Config& config)
53 {
54 config_ = config;
55 config_.queueSize_ = std::min(config.queueSize_, MAX_SAMPLE_QUEUE_SIZE);
56 config_.bufferCap_ = std::min(config.bufferCap_, MAX_SAMPLE_BUFFER_CAP);
57 config_.queueName_ = "SampleQueue_" + std::to_string(config_.queueId_);
58 sampleBufferQueue_ = AVBufferQueue::Create(config_.queueSize_, MemoryType::VIRTUAL_MEMORY, config_.queueName_);
59 FALSE_RETURN_V_MSG_E(sampleBufferQueue_ != nullptr, Status::ERROR_NO_MEMORY, "AVBufferQueue::Create failed");
60 if (config_.isFlvLiveStream_) {
61 config_.queueSize_ = MAX_SAMPLE_QUEUE_SIZE;
62 sampleBufferQueue_->SetLargerQueueSize(config_.queueSize_);
63 }
64
65 sampleBufferQueueProducer_ = sampleBufferQueue_->GetProducer();
66
67 sampleBufferQueueConsumer_ = sampleBufferQueue_->GetConsumer();
68 sptr<IConsumerListener> consumerListener = new(std::nothrow) SampleBufferConsumerListener(shared_from_this());
69 FALSE_RETURN_V_MSG_E(consumerListener != nullptr, Status::ERROR_NO_MEMORY, "SampleBufferConsumerListener nullptr");
70 sampleBufferQueueConsumer_->SetBufferAvailableListener(consumerListener);
71
72 MEDIA_LOG_I(PUBLIC_LOG_S " AVBufferQueue::Create queueSize_" PUBLIC_LOG_U32,
73 config_.queueName_.c_str(),
74 config_.queueSize_);
75 return AttachBuffer();
76 }
77
AttachBuffer()78 Status SampleQueue::AttachBuffer()
79 {
80 for (uint32_t i = 0; i < config_.queueSize_; i++) {
81 auto avAllocator = AVAllocatorFactory::CreateVirtualAllocator();
82 std::shared_ptr<AVBuffer> buffer = AVBuffer::CreateAVBuffer(avAllocator, config_.bufferCap_);
83 FALSE_RETURN_V_MSG_E(buffer != nullptr, Status::ERROR_NO_MEMORY, "CreateAVBuffer failed");
84 Status status = sampleBufferQueueProducer_->AttachBuffer(buffer, false);
85 FALSE_RETURN_V_MSG_E(
86 status == Status::OK, status, "AttachBuffer failed status=" PUBLIC_LOG_D32, (int32_t)status);
87 }
88 return Status::OK;
89 }
90
SetSampleQueueCallback(std::shared_ptr<SampleQueueCallback> sampleQueueCb)91 Status SampleQueue::SetSampleQueueCallback(std::shared_ptr<SampleQueueCallback> sampleQueueCb)
92 {
93 sampleQueueCb_ = sampleQueueCb;
94 return Status::OK;
95 }
96
GetBufferQueueProducer() const97 sptr<AVBufferQueueProducer> SampleQueue::GetBufferQueueProducer() const
98 {
99 return sampleBufferQueueProducer_;
100 }
101
RequestBuffer(std::shared_ptr<AVBuffer> & sampleBuffer,const AVBufferConfig & config,int32_t timeoutMs)102 Status SampleQueue::RequestBuffer(
103 std::shared_ptr<AVBuffer> &sampleBuffer, const AVBufferConfig &config, int32_t timeoutMs)
104 {
105 MediaAVCodec::AVCodecTrace trace("SampleQueue::RequestBuffer");
106 FALSE_RETURN_V(sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
107 MEDIA_LOG_D(PUBLIC_LOG_S " sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
108 config_.queueName_.c_str(),
109 sampleBufferQueueProducer_->GetQueueSize());
110 return sampleBufferQueueProducer_->RequestBuffer(sampleBuffer, config, timeoutMs);
111 }
112
PushBuffer(std::shared_ptr<AVBuffer> & sampleBuffer,bool available)113 Status SampleQueue::PushBuffer(std::shared_ptr<AVBuffer>& sampleBuffer, bool available)
114 {
115 MediaAVCodec::AVCodecTrace trace("SampleQueue::PushBuffer");
116 FALSE_RETURN_V(sampleBuffer != nullptr && sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
117 MEDIA_LOG_D(PUBLIC_LOG_S " sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
118 config_.queueName_.c_str(),
119 sampleBufferQueueProducer_->GetQueueSize());
120 Status status = sampleBufferQueueProducer_->PushBuffer(sampleBuffer, available);
121 FALSE_RETURN_V(available && status == Status::OK, status);
122
123 lastEnterSamplePts_ = sampleBuffer->pts_;
124 MEDIA_LOG_D(PUBLIC_LOG_S " PushBuffer pts=" PUBLIC_LOG_D64 " dts=" PUBLIC_LOG_D64 " duration=" PUBLIC_LOG_D64,
125 config_.queueName_.c_str(), sampleBuffer->pts_, sampleBuffer->dts_, sampleBuffer->duration_);
126 if (!config_.isSupportBitrateSwitch_) {
127 return Status::OK;
128 }
129
130 if (!IsKeyFrame(sampleBuffer)) {
131 return Status::OK;
132 }
133 MEDIA_LOG_I(PUBLIC_LOG_S " insert Key Frame pts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->pts_);
134 {
135 std::lock_guard<std::mutex> ptsLock(ptsMutex_);
136 keyFramePtsSet_.insert(sampleBuffer->pts_);
137 }
138
139 {
140 std::lock_guard<std::mutex> statusLock(statusMutex_);
141 if (IsSwitchBitrateOK()) {
142 NotifySwitchBitrateOK();
143 }
144 }
145 return Status::OK;
146 }
147
AcquireBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)148 Status SampleQueue::AcquireBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
149 {
150 MediaAVCodec::AVCodecTrace trace("SampleQueue::AcquireBuffer");
151 // return from rollbackBufferQueue_ first
152 if (!rollbackBufferQueue_.empty()) {
153 sampleBuffer = rollbackBufferQueue_.front();
154 rollbackBufferQueue_.pop_front();
155 MEDIA_LOG_I(PUBLIC_LOG_S " AcquireBuffer from rollbackBufferQueue_", config_.queueName_.c_str());
156 } else {
157 FALSE_RETURN_V(sampleBufferQueueConsumer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
158 Status ret = sampleBufferQueueConsumer_->AcquireBuffer(sampleBuffer);
159 FALSE_RETURN_V(ret == Status::OK, ret);
160 MEDIA_LOG_D(PUBLIC_LOG_S" bufferId: " PUBLIC_LOG_U64
161 ", pts: " PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->GetUniqueId(), sampleBuffer->pts_);
162 }
163
164 if (!config_.isSupportBitrateSwitch_) {
165 MEDIA_LOG_D(PUBLIC_LOG_S " not SupportBitrateSwitch", config_.queueName_.c_str());
166 return Status::OK;
167 }
168
169 if (IsKeyFrame(sampleBuffer)) {
170 MEDIA_LOG_I(
171 PUBLIC_LOG_S " erase Key Frame pts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->pts_);
172 std::lock_guard<std::mutex> ptsLock(ptsMutex_);
173 keyFramePtsSet_.erase(sampleBuffer->pts_);
174 }
175
176 return Status::OK;
177 }
178
AcquireCopyToDstBuffer(std::shared_ptr<AVBuffer> & dstBuffer)179 Status SampleQueue::AcquireCopyToDstBuffer(std::shared_ptr<AVBuffer>& dstBuffer)
180 {
181 MediaAVCodec::AVCodecTrace trace("SampleQueue::AcquireCopyToDstBuffer");
182 MEDIA_LOG_D(PUBLIC_LOG_S " AcquireCopyToDstBuffer in", config_.queueName_.c_str());
183 FALSE_RETURN_V(dstBuffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
184
185 std::shared_ptr<AVBuffer> srcBuffer;
186 Status ret = AcquireBuffer(srcBuffer);
187 FALSE_RETURN_V(ret == Status::OK && srcBuffer != nullptr, ret);
188
189 ret = CopyBuffer(srcBuffer, dstBuffer);
190 if (ret != Status::OK) {
191 MEDIA_LOG_W(PUBLIC_LOG_S " AcquireCopyToDstBuffer fail ret=" PUBLIC_LOG_D32, config_.queueName_.c_str(), ret);
192 RollbackBuffer(srcBuffer);
193 return ret;
194 }
195 UpdateLastOutSamplePts(dstBuffer->pts_);
196
197 ret = ReleaseBuffer(srcBuffer);
198 if (ret == Status::OK) {
199 OnBufferAvailable();
200 }
201 MEDIA_LOG_D(PUBLIC_LOG_S " AcquireCopyToDstBuffer out", config_.queueName_.c_str());
202 return ret;
203 }
204
CopyBuffer(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)205 Status SampleQueue::CopyBuffer(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
206 {
207 // copy basic data
208 dstBuffer->pts_ = srcBuffer->pts_;
209 dstBuffer->dts_ = srcBuffer->dts_;
210 dstBuffer->duration_ = srcBuffer->duration_;
211 dstBuffer->flag_ = srcBuffer->flag_;
212
213 CopyMeta(srcBuffer, dstBuffer);
214
215 if (IsEosFrame(dstBuffer)) {
216 MEDIA_LOG_I(PUBLIC_LOG_S " receive IsEosFrame", config_.queueName_.c_str());
217 return Status::OK;
218 }
219 return CopyAVMemory(srcBuffer, dstBuffer);
220 }
221
222
CopyMeta(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)223 void SampleQueue::CopyMeta(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
224 {
225 if (srcBuffer->meta_ == nullptr) {
226 dstBuffer->meta_ = nullptr;
227 return;
228 }
229
230 uint32_t trackId = INVALID_TRACK_ID;
231 if (!dstBuffer->meta_->GetData(Tag::REGULAR_TRACK_ID, trackId)) {
232 MEDIA_LOG_D("trackId not found");
233 }
234
235 dstBuffer->meta_ = std::make_shared<Meta>(*(srcBuffer->meta_));
236 if (dstBuffer->meta_ == nullptr) {
237 return;
238 }
239
240 if (trackId != INVALID_TRACK_ID) {
241 dstBuffer->meta_->SetData(Tag::REGULAR_TRACK_ID, trackId);
242 }
243 }
244
CopyAVMemory(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)245 Status SampleQueue::CopyAVMemory(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
246 {
247 std::shared_ptr<AVMemory>& srcMemory = srcBuffer->memory_;
248 std::shared_ptr<AVMemory>& dstMemory = dstBuffer->memory_;
249 if (!srcMemory || !dstMemory) {
250 return Status::ERROR_NULL_POINT_BUFFER;
251 }
252 if (srcMemory->GetSize() > dstMemory->GetCapacity()) {
253 MEDIA_LOG_E(PUBLIC_LOG_S " srcMemory->GetSize() " PUBLIC_LOG_U32 "dstMemory->GetCapacity()" PUBLIC_LOG_U32
254 " srcMemory->GetOffset()" PUBLIC_LOG_U32,
255 config_.queueName_.c_str(),
256 srcMemory->GetSize(),
257 dstMemory->GetCapacity(),
258 srcMemory->GetOffset());
259 return Status::ERROR_INVALID_BUFFER_SIZE;
260 }
261
262 errno_t copyRet = memcpy_s(dstMemory->GetAddr(),
263 dstMemory->GetCapacity(),
264 srcMemory->GetAddr() + srcMemory->GetOffset(),
265 srcMemory->GetSize());
266 if (copyRet != EOK) {
267 return Status::ERROR_UNKNOWN;
268 }
269 dstMemory->SetSize(srcMemory->GetSize());
270 dstMemory->SetOffset(srcMemory->GetOffset());
271 return Status::OK;
272 }
273
ReleaseBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)274 Status SampleQueue::ReleaseBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
275 {
276 MEDIA_LOG_D(PUBLIC_LOG_S " ReleaseBuffer", config_.queueName_.c_str());
277 FALSE_RETURN_V(sampleBufferQueueConsumer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
278 Status status = sampleBufferQueueConsumer_->ReleaseBuffer(sampleBuffer);
279 FALSE_RETURN_V_MSG_E(
280 status == Status::OK, status, PUBLIC_LOG_S "ReleaseBuffer failed ", config_.queueName_.c_str());
281 MEDIA_LOG_D(PUBLIC_LOG_S " bufferId: " PUBLIC_LOG_U64 ", pts: " PUBLIC_LOG_D64,
282 config_.queueName_.c_str(),
283 sampleBuffer->GetUniqueId(),
284 sampleBuffer->pts_);
285 return status;
286 }
287
RollbackBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)288 Status SampleQueue::RollbackBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
289 {
290 MEDIA_LOG_D(PUBLIC_LOG_S " RollbackBuffer", config_.queueName_.c_str());
291 rollbackBufferQueue_.push_back(sampleBuffer);
292 return Status::OK;
293 }
294
QuerySizeForNextAcquireBuffer(size_t & size)295 Status SampleQueue::QuerySizeForNextAcquireBuffer(size_t& size)
296 {
297 std::shared_ptr<AVBuffer> sampleBuffer;
298 if (!rollbackBufferQueue_.empty()) {
299 sampleBuffer = rollbackBufferQueue_.front();
300 } else {
301 Status ret = AcquireBuffer(sampleBuffer);
302 FALSE_RETURN_V_MSG_D(
303 ret == Status::OK, ret, PUBLIC_LOG_S " failed ret=" PUBLIC_LOG_D32, config_.queueName_.c_str(), ret);
304 SampleQueue::RollbackBuffer(sampleBuffer);
305 }
306 FALSE_RETURN_V(sampleBuffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
307 size = sampleBuffer->GetConfig().capacity;
308 MEDIA_LOG_D(PUBLIC_LOG_S " QuerySizeForNextAcquireBuffer size=" PUBLIC_LOG_ZU, config_.queueName_.c_str(), size);
309 return Status::OK;
310 }
311
Clear()312 Status SampleQueue::Clear()
313 {
314 MEDIA_LOG_I(PUBLIC_LOG_S " SampleQueue Clear", config_.queueName_.c_str());
315 while (!rollbackBufferQueue_.empty()) {
316 auto sampleBuffer = rollbackBufferQueue_.front();
317 MEDIA_LOG_I(PUBLIC_LOG_S" clear rollbackBufferQueue_ bufferId: " PUBLIC_LOG_U64
318 ", pts: " PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->GetUniqueId(), sampleBuffer->pts_);
319 rollbackBufferQueue_.pop_front();
320 ReleaseBuffer(sampleBuffer);
321 }
322 sampleBufferQueueProducer_->Clear();
323
324 std::lock_guard<std::mutex> ptsLock(ptsMutex_);
325 keyFramePtsSet_.clear();
326 return Status::OK;
327 }
328
DiscardSampleAfter(int64_t startPts)329 Status SampleQueue::DiscardSampleAfter(int64_t startPts)
330 {
331 MEDIA_LOG_I(PUBLIC_LOG_S "DiscardSampleAfter startPts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), startPts);
332 {
333 std::lock_guard<std::mutex> ptsLock(ptsMutex_);
334 MEDIA_LOG_I("before DiscardSampleAfter keyFramePtsSet_ =" PUBLIC_LOG_S, SetToString(keyFramePtsSet_).c_str());
335 auto it = keyFramePtsSet_.lower_bound(startPts);
336 keyFramePtsSet_.erase(it, keyFramePtsSet_.end());
337 lastEndSamplePts_ = startPts;
338 }
339 FALSE_RETURN_V(sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
340 auto isNewerSample = [startPts](const std::shared_ptr<AVBuffer>& buffer) {
341 return (buffer != nullptr) && (buffer->pts_ >= startPts);
342 };
343 return sampleBufferQueueProducer_->ClearBufferIf(isNewerSample);
344 }
345
ReadySwitchBitrate(uint32_t bitrate)346 Status SampleQueue::ReadySwitchBitrate(uint32_t bitrate)
347 {
348 MediaAVCodec::AVCodecTrace trace("SampleQueue::ReadySwitchBitrate");
349 if (!config_.isSupportBitrateSwitch_) {
350 MEDIA_LOG_W("invalid operation for ReadySwitchBitrate=" PUBLIC_LOG_U32, bitrate);
351 return Status::ERROR_INVALID_OPERATION;
352 }
353 std::lock_guard<std::mutex> statusLock(statusMutex_);
354 if (switchStatus_ == SelectBitrateStatus::NORMAL) {
355 nextSwitchBitrate_ = bitrate;
356 switchStatus_ = SelectBitrateStatus::READY_SWITCH;
357 if (IsSwitchBitrateOK()) {
358 return NotifySwitchBitrateOK();
359 }
360 } else if (switchStatus_ == SelectBitrateStatus::READY_SWITCH) {
361 // replace the old bitrate before SWITCHING
362 MEDIA_LOG_W("replace new request bitrate from " PUBLIC_LOG_U32 " to"
363 PUBLIC_LOG_U32, nextSwitchBitrate_, bitrate);
364 nextSwitchBitrate_ = bitrate;
365 } else if (switchStatus_ == SelectBitrateStatus::SWITCHING) {
366 // incomming new bitrate just put switchBitrateWaitList_ when switching
367 std::lock_guard<std::mutex> lockList(waitListMutex_);
368 // drop the oldest bitrate in switchBitrateWaitList_
369 if (switchBitrateWaitList_.size() >= MAX_BITRATE_SWITCH_WAIT_NUMBER) {
370 uint32_t oldestBitrate = switchBitrateWaitList_.front();
371 switchBitrateWaitList_.pop_front();
372 MEDIA_LOG_I("switchBitrateWaitList_ remove oldestBitrate: " PUBLIC_LOG_U32, oldestBitrate);
373 }
374 MEDIA_LOG_I("switchBitrateWaitList_ add new bitrate: " PUBLIC_LOG_U32, bitrate);
375 switchBitrateWaitList_.push_back(bitrate);
376 }
377 return Status::OK;
378 }
379
NotifySwitchBitrateOK()380 Status SampleQueue::NotifySwitchBitrateOK()
381 {
382 {
383 auto sampleQueueCb = sampleQueueCb_.lock();
384 FALSE_RETURN_V(sampleQueueCb != nullptr, Status::ERROR_NULL_POINT_BUFFER);
385 sampleQueueCb->OnSelectBitrateOk(startPtsToSwitch_, nextSwitchBitrate_);
386 }
387 switchStatus_ = SelectBitrateStatus::SWITCHING;
388 MEDIA_LOG_I("SelectBitrateStatus::SWITCHING for startPtsToSwitch_=" PUBLIC_LOG_D64 ",nextSwitchBitrate_="
389 PUBLIC_LOG_U32, startPtsToSwitch_, nextSwitchBitrate_);
390 return Status::OK;
391 }
392
UpdateLastEndSamplePts(int64_t lastEndSamplePts)393 Status SampleQueue::UpdateLastEndSamplePts(int64_t lastEndSamplePts)
394 {
395 MEDIA_LOG_D("UpdateLastEndSamplePts lastEndSamplePts=" PUBLIC_LOG_D64, lastEndSamplePts);
396 lastEndSamplePts_ = lastEndSamplePts;
397 return Status::OK;
398 }
399
UpdateLastOutSamplePts(int64_t lastOutSamplePts)400 Status SampleQueue::UpdateLastOutSamplePts(int64_t lastOutSamplePts)
401 {
402 MEDIA_LOG_D("UpdateLastOutSamplePts lastOutSamplePts=" PUBLIC_LOG_D64, lastOutSamplePts);
403 lastOutSamplePts_ = lastOutSamplePts;
404 return Status::OK;
405 }
406
ResponseForSwitchDone(int64_t startPtsOnSwitch)407 Status SampleQueue::ResponseForSwitchDone(int64_t startPtsOnSwitch)
408 {
409 MEDIA_LOG_I(PUBLIC_LOG_S " ResponseForSwitchDone startPtsOnSwitch=" PUBLIC_LOG_D64,
410 config_.queueName_.c_str(),
411 startPtsOnSwitch);
412
413 Status ret = DiscardSampleAfter(startPtsOnSwitch);
414 FALSE_RETURN_V_NOLOG(ret == Status::OK, ret);
415 {
416 std::lock_guard<std::mutex> statusLock(statusMutex_);
417 if (switchStatus_ == SelectBitrateStatus::SWITCHING) {
418 switchStatus_ = SelectBitrateStatus::NORMAL;
419 }
420 CheckSwitchBitrateWaitList();
421 }
422 return Status::OK;
423 }
424
CheckSwitchBitrateWaitList()425 void SampleQueue::CheckSwitchBitrateWaitList()
426 {
427 std::lock_guard<std::mutex> lockList(waitListMutex_);
428 auto it = switchBitrateWaitList_.begin();
429 while (it != switchBitrateWaitList_.end()) {
430 if (*it != nextSwitchBitrate_) {
431 nextSwitchBitrate_ = *it;
432 switchStatus_ = SelectBitrateStatus::READY_SWITCH;
433 MEDIA_LOG_I("READY_SWITCH to nextSwitchBitrate_=" PUBLIC_LOG_U32, nextSwitchBitrate_);
434 switchBitrateWaitList_.erase(it);
435 break;
436 } else {
437 it = switchBitrateWaitList_.erase(it);
438 }
439 }
440 }
441
IsSwitchBitrateOK()442 bool SampleQueue::IsSwitchBitrateOK()
443 {
444 if (switchStatus_ != SelectBitrateStatus::READY_SWITCH) {
445 return false;
446 }
447
448 if (!IsKeyFrameAvailable()) {
449 return false;
450 }
451 int64_t cacheDiff = startPtsToSwitch_ - lastEndSamplePts_;
452 MEDIA_LOG_I("IsSwitchBitrateOK cacheDiff=" PUBLIC_LOG_D64 ", startPtsToSwitch_=" PUBLIC_LOG_D64
453 ", lastEndSamplePts_=" PUBLIC_LOG_D64,
454 cacheDiff,
455 startPtsToSwitch_,
456 lastEndSamplePts_);
457 return true;
458 }
459
460
IsKeyFrameAvailable()461 bool SampleQueue::IsKeyFrameAvailable()
462 {
463 std::lock_guard<std::mutex> ptsLock(ptsMutex_);
464 MEDIA_LOG_I("keyFramePtsSet_ =" PUBLIC_LOG_S, SetToString(keyFramePtsSet_).c_str());
465 auto it = keyFramePtsSet_.lower_bound(lastEndSamplePts_ + MIN_SWITCH_BITRATE_TIME_US);
466 if (it != keyFramePtsSet_.end()) {
467 startPtsToSwitch_ = *it;
468 MEDIA_LOG_I("ok cache MIN_SWITCH_BITRATE_TIME_US with keyframe startpts=" PUBLIC_LOG_D64, startPtsToSwitch_);
469 } else if (!keyFramePtsSet_.empty()) {
470 startPtsToSwitch_ = *(keyFramePtsSet_.rbegin());
471 MEDIA_LOG_I("ok with last keyframe startpts=" PUBLIC_LOG_D64, startPtsToSwitch_);
472 } else {
473 return false;
474 }
475 return true;
476 }
477
SetToString(std::set<int64_t> localSet)478 std::string SampleQueue::SetToString(std::set<int64_t> localSet)
479 {
480 std::stringstream ss;
481 for (auto it = localSet.begin(); it != localSet.end(); ++it) {
482 if (it != localSet.begin()) {
483 ss << " ";
484 }
485 ss << *it;
486 }
487 return ss.str();
488 }
489
IsKeyFrame(std::shared_ptr<AVBuffer> & sampleBuffer) const490 bool SampleQueue::IsKeyFrame(std::shared_ptr<AVBuffer>& sampleBuffer) const
491 {
492 return (sampleBuffer != nullptr) &&
493 (sampleBuffer->flag_ & static_cast<uint32_t>(Plugins::AVBufferFlag::SYNC_FRAME));
494 }
495
IsEosFrame(std::shared_ptr<AVBuffer> & sampleBuffer) const496 bool SampleQueue::IsEosFrame(std::shared_ptr<AVBuffer>& sampleBuffer) const
497 {
498 return (sampleBuffer != nullptr) && (sampleBuffer->flag_ & static_cast<uint32_t>(Plugins::AVBufferFlag::EOS));
499 }
500
OnBufferAvailable()501 void SampleQueue::OnBufferAvailable()
502 {
503 MEDIA_LOG_D(PUBLIC_LOG_S " OnBufferAvailable sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
504 config_.queueName_.c_str(),
505 sampleBufferQueueProducer_->GetQueueSize());
506 auto sampleQueueCb = sampleQueueCb_.lock();
507 if (sampleQueueCb != nullptr) {
508 MEDIA_LOG_D(PUBLIC_LOG_S " OnSampleQueueBufferAvailable ", config_.queueName_.c_str());
509 sampleQueueCb->OnSampleQueueBufferAvailable(config_.queueId_);
510 }
511 }
512
OnBufferConsumer()513 void SampleQueue::OnBufferConsumer()
514 {
515 MEDIA_LOG_D(PUBLIC_LOG_S " OnBufferConsumer ", config_.queueName_.c_str());
516 auto sampleQueueCb = sampleQueueCb_.lock();
517 if (sampleQueueCb != nullptr) {
518 MEDIA_LOG_D(PUBLIC_LOG_S " OnSampleQueueBufferConsume ", config_.queueName_.c_str());
519 sampleQueueCb->OnSampleQueueBufferConsume(config_.queueId_);
520 }
521 }
522
UpdateQueueId(uint32_t queueId)523 void SampleQueue::UpdateQueueId(uint32_t queueId)
524 {
525 MEDIA_LOG_I(PUBLIC_LOG_S " change queueId to " PUBLIC_LOG_U32, config_.queueName_.c_str(), queueId);
526 config_.queueId_ = queueId;
527 }
528
GetCacheDuration() const529 uint64_t SampleQueue::GetCacheDuration() const
530 {
531 if (lastEnterSamplePts_ == Plugins::HST_TIME_NONE || lastOutSamplePts_ == Plugins::HST_TIME_NONE) {
532 return 0;
533 }
534 int64_t diff = lastEnterSamplePts_ - lastOutSamplePts_;
535 MEDIA_LOG_D(PUBLIC_LOG_S " lastEnterSamplePts_=" PUBLIC_LOG_D64 " lastEndSamplePts_=" PUBLIC_LOG_D64
536 " diff=" PUBLIC_LOG_D64, config_.queueName_.c_str(), lastEnterSamplePts_, lastOutSamplePts_, diff);
537 return (diff > 0) ? static_cast<uint64_t>(diff) : 0;
538 }
539
StringifyMeta(std::shared_ptr<Meta> & meta)540 std::string SampleQueue::StringifyMeta(std::shared_ptr<Meta> &meta)
541 {
542 FALSE_RETURN_V(meta != nullptr, "");
543 std::stringstream dumpStream;
544 for (auto iter = meta->begin(); iter != meta->end(); ++iter) {
545 switch (meta->GetValueType(iter->first)) {
546 case AnyValueType::INT32_T:
547 dumpStream << iter->first << " = " << std::to_string(AnyCast<int32_t>(iter->second)) << " | ";
548 break;
549 case AnyValueType::UINT32_T:
550 dumpStream << iter->first << " = " << std::to_string(AnyCast<uint32_t>(iter->second)) << " | ";
551 break;
552 case AnyValueType::BOOL:
553 dumpStream << iter->first << " = " << std::to_string(AnyCast<bool>(iter->second)) << " | ";
554 break;
555 case AnyValueType::DOUBLE:
556 dumpStream << iter->first << " = " << std::to_string(AnyCast<double>(iter->second)) << " | ";
557 break;
558 case AnyValueType::INT64_T:
559 dumpStream << iter->first << " = " << std::to_string(AnyCast<int64_t>(iter->second)) << " | ";
560 break;
561 case AnyValueType::FLOAT:
562 dumpStream << iter->first << " = " << std::to_string(AnyCast<float>(iter->second)) << " | ";
563 break;
564 case AnyValueType::STRING:
565 dumpStream << iter->first << " = " << AnyCast<std::string>(iter->second) << " | ";
566 break;
567 default:
568 dumpStream << iter->first << " = " << "unknown type | ";
569 break;
570 }
571 }
572 return dumpStream.str();
573 }
574 } // namespace Media
575 } // namespace OHOS
576