1 /*
2 * Copyright (c) 2023-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define HST_LOG_TAG "SampleQueue"
17
18 #include <sstream>
19 #include <securec.h>
20 #include "common/log.h"
21 #include "sample_queue.h"
22 #include "avcodec_trace.h"
23
24 namespace {
25 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_PLAYER, "SampleQueue" };
26 constexpr int32_t INVALID_TRACK_ID = -1;
27 }
28
29 namespace OHOS {
30 namespace Media {
31
32 class SampleBufferConsumerListener : public IConsumerListener {
33 public:
SampleBufferConsumerListener(std::shared_ptr<SampleQueue> sampleQueue)34 explicit SampleBufferConsumerListener(std::shared_ptr<SampleQueue> sampleQueue)
35 : sampleQueue_(std::move(sampleQueue))
36 {}
37 virtual ~SampleBufferConsumerListener() = default;
38
OnBufferAvailable()39 void OnBufferAvailable() override
40 {
41 if (auto sampleQueue = sampleQueue_.lock()) {
42 sampleQueue->OnBufferConsumer();
43 } else {
44 MEDIA_LOG_E("consumer listener: Invalid sampleQueue instance.");
45 }
46 }
47
48 private:
49 std::weak_ptr<SampleQueue> sampleQueue_;
50 };
51
52 class SampleBufferProducerListener : public IRemoteStub<IProducerListener> {
53 public:
SampleBufferProducerListener(std::shared_ptr<SampleQueue> sampleQueue)54 explicit SampleBufferProducerListener(std::shared_ptr<SampleQueue> sampleQueue)
55 : sampleQueue_(std::move(sampleQueue))
56 {}
57 virtual ~SampleBufferProducerListener() = default;
58
OnBufferAvailable()59 void OnBufferAvailable() override
60 {
61 if (auto sampleQueue = sampleQueue_.lock()) {
62 sampleQueue->OnBufferAvailable();
63 } else {
64 MEDIA_LOG_E("prodecer listener: Invalid sampleQueue instance.");
65 }
66 }
67
68 private:
69 std::weak_ptr<SampleQueue> sampleQueue_;
70 };
71
Init(const Config & config)72 Status SampleQueue::Init(const Config& config)
73 {
74 config_ = config;
75 config_.queueSize_ = std::min(config.queueSize_, MAX_SAMPLE_QUEUE_SIZE);
76 config_.bufferCap_ = std::min(config.bufferCap_, MAX_SAMPLE_BUFFER_CAP);
77 config_.queueName_ = "SampleQueue_" + std::to_string(config_.queueId_);
78 sampleBufferQueue_ = AVBufferQueue::Create(config_.queueSize_, MemoryType::VIRTUAL_MEMORY, config_.queueName_);
79 FALSE_RETURN_V_MSG_E(sampleBufferQueue_ != nullptr, Status::ERROR_NO_MEMORY, "AVBufferQueue::Create failed");
80 if (config_.isFlvLiveStream_) {
81 config_.queueSize_ = MAX_SAMPLE_QUEUE_SIZE;
82 sampleBufferQueue_->SetLargerQueueSize(config_.queueSize_);
83 }
84
85 sampleBufferQueueProducer_ = sampleBufferQueue_->GetProducer();
86 sptr<IProducerListener> producerListener = OHOS::sptr<SampleBufferProducerListener>::MakeSptr(shared_from_this());
87 FALSE_RETURN_V_MSG_E(producerListener != nullptr, Status::ERROR_NO_MEMORY, "SampleBufferProducerListener nullptr");
88 sampleBufferQueueProducer_->SetBufferAvailableListener(producerListener);
89
90 sampleBufferQueueConsumer_ = sampleBufferQueue_->GetConsumer();
91 sptr<IConsumerListener> consumerListener = new(std::nothrow) SampleBufferConsumerListener(shared_from_this());
92 FALSE_RETURN_V_MSG_E(consumerListener != nullptr, Status::ERROR_NO_MEMORY, "SampleBufferConsumerListener nullptr");
93 sampleBufferQueueConsumer_->SetBufferAvailableListener(consumerListener);
94
95 MEDIA_LOG_I(PUBLIC_LOG_S " AVBufferQueue::Create queueSize_" PUBLIC_LOG_U32,
96 config_.queueName_.c_str(),
97 config_.queueSize_);
98 return AttachBuffer();
99 }
100
SetLargerQueueSize(uint32_t size)101 Status SampleQueue::SetLargerQueueSize(uint32_t size)
102 {
103 if (size != config_.queueSize_) {
104 Status status = sampleBufferQueue_->SetLargerQueueSize(size);
105 FALSE_RETURN_V_MSG_E(status == Status::OK, status, "SetLargerQueueSize failed status=" PUBLIC_LOG_D32,
106 static_cast<int32_t>(status));
107 MEDIA_LOG_I("sampleBufferQueue size is change to " PUBLIC_LOG_U32, size);
108 config_.queueSize_ = size;
109 }
110 return Status::OK;
111 }
112
AddQueueSize(uint32_t size)113 Status SampleQueue::AddQueueSize(uint32_t size)
114 {
115 Status status = sampleBufferQueue_->SetLargerQueueSize(config_.queueSize_ + size);
116 FALSE_RETURN_V(status == Status::OK, status);
117 config_.queueSize_ = config_.queueSize_ + size;
118 MEDIA_LOG_I("sampleBufferQueue size is add to " PUBLIC_LOG_U32, config_.queueSize_);
119 return Status::OK;
120 }
121
AttachBuffer()122 Status SampleQueue::AttachBuffer()
123 {
124 for (uint32_t i = 0; i < config_.queueSize_; i++) {
125 auto avAllocator = AVAllocatorFactory::CreateVirtualAllocator();
126 std::shared_ptr<AVBuffer> buffer = AVBuffer::CreateAVBuffer(avAllocator, config_.bufferCap_);
127 FALSE_RETURN_V_MSG_E(buffer != nullptr, Status::ERROR_NO_MEMORY, "CreateAVBuffer failed");
128 Status status = sampleBufferQueueProducer_->AttachBuffer(buffer, false);
129 FALSE_RETURN_V_MSG_E(
130 status == Status::OK, status, "AttachBuffer failed status=" PUBLIC_LOG_D32, static_cast<int32_t>(status));
131 }
132 return Status::OK;
133 }
134
SetSampleQueueCallback(std::shared_ptr<SampleQueueCallback> sampleQueueCb)135 Status SampleQueue::SetSampleQueueCallback(std::shared_ptr<SampleQueueCallback> sampleQueueCb)
136 {
137 sampleQueueCb_ = sampleQueueCb;
138 return Status::OK;
139 }
140
GetBufferQueueProducer() const141 sptr<AVBufferQueueProducer> SampleQueue::GetBufferQueueProducer() const
142 {
143 return sampleBufferQueueProducer_;
144 }
145
RequestBuffer(std::shared_ptr<AVBuffer> & sampleBuffer,const AVBufferConfig & config,int32_t timeoutMs)146 Status SampleQueue::RequestBuffer(
147 std::shared_ptr<AVBuffer> &sampleBuffer, const AVBufferConfig &config, int32_t timeoutMs)
148 {
149 MEDIA_TRACE_DEBUG("SampleQueue::RequestBuffer");
150 FALSE_RETURN_V(sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
151 MEDIA_LOG_DD(PUBLIC_LOG_S " sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
152 config_.queueName_.c_str(),
153 sampleBufferQueueProducer_->GetQueueSize());
154 return sampleBufferQueueProducer_->RequestBuffer(sampleBuffer, config, timeoutMs);
155 }
156
PushBuffer(std::shared_ptr<AVBuffer> & sampleBuffer,bool available)157 Status SampleQueue::PushBuffer(std::shared_ptr<AVBuffer>& sampleBuffer, bool available)
158 {
159 MEDIA_TRACE_DEBUG("SampleQueue::PushBuffer");
160 FALSE_RETURN_V(sampleBuffer != nullptr && sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
161 MEDIA_LOG_DD(PUBLIC_LOG_S " sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
162 config_.queueName_.c_str(),
163 sampleBufferQueueProducer_->GetQueueSize());
164 Status status = sampleBufferQueueProducer_->PushBuffer(sampleBuffer, available);
165 FALSE_RETURN_V(available && status == Status::OK, status);
166
167 lastEnterSamplePts_ = sampleBuffer->pts_;
168 MEDIA_LOG_DD(PUBLIC_LOG_S " PushBuffer pts=" PUBLIC_LOG_D64 " dts=" PUBLIC_LOG_D64 " duration=" PUBLIC_LOG_D64,
169 config_.queueName_.c_str(), sampleBuffer->pts_, sampleBuffer->dts_, sampleBuffer->duration_);
170 if (!config_.isSupportBitrateSwitch_) {
171 return Status::OK;
172 }
173
174 if (!IsKeyFrame(sampleBuffer)) {
175 return Status::OK;
176 }
177 MEDIA_LOG_I(PUBLIC_LOG_S " insert Key Frame pts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->pts_);
178 {
179 std::lock_guard<std::mutex> ptsLock(ptsMutex_);
180 keyFramePtsSet_.insert(sampleBuffer->pts_);
181 }
182
183 {
184 std::lock_guard<std::mutex> statusLock(statusMutex_);
185 if (IsSwitchBitrateOK()) {
186 NotifySwitchBitrateOK();
187 }
188 }
189 return Status::OK;
190 }
191
AcquireBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)192 Status SampleQueue::AcquireBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
193 {
194 MEDIA_TRACE_DEBUG("SampleQueue::AcquireBuffer");
195 // return from rollbackBufferQueue_ first
196 if (!rollbackBufferQueue_.empty()) {
197 sampleBuffer = rollbackBufferQueue_.front();
198 rollbackBufferQueue_.pop_front();
199 MEDIA_LOG_DD(PUBLIC_LOG_S " AcquireBuffer from rollbackBufferQueue_", config_.queueName_.c_str());
200 } else {
201 FALSE_RETURN_V(sampleBufferQueueConsumer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
202 Status ret = sampleBufferQueueConsumer_->AcquireBuffer(sampleBuffer);
203 FALSE_RETURN_V_NOLOG(ret == Status::OK, ret);
204 MEDIA_LOG_DD(PUBLIC_LOG_S " bufferId: " PUBLIC_LOG_U64 ", pts: " PUBLIC_LOG_D64
205 " GetCacheDuration= " PUBLIC_LOG_U64 " GetFilledBufferSize= " PUBLIC_LOG_U32,
206 config_.queueName_.c_str(),
207 sampleBuffer->GetUniqueId(),
208 sampleBuffer->pts_,
209 GetCacheDuration(),
210 sampleBufferQueueConsumer_->GetFilledBufferSize());
211 }
212
213 if (!config_.isSupportBitrateSwitch_) {
214 MEDIA_LOG_DD(PUBLIC_LOG_S " not SupportBitrateSwitch", config_.queueName_.c_str());
215 return Status::OK;
216 }
217
218 if (IsKeyFrame(sampleBuffer)) {
219 MEDIA_LOG_I(
220 PUBLIC_LOG_S " erase Key Frame pts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->pts_);
221 std::lock_guard<std::mutex> ptsLock(ptsMutex_);
222 keyFramePtsSet_.erase(sampleBuffer->pts_);
223 }
224
225 return Status::OK;
226 }
227
AcquireCopyToDstBuffer(std::shared_ptr<AVBuffer> & dstBuffer)228 Status SampleQueue::AcquireCopyToDstBuffer(std::shared_ptr<AVBuffer>& dstBuffer)
229 {
230 MEDIA_TRACE_DEBUG("SampleQueue::AcquireCopyToDstBuffer");
231 MEDIA_LOG_DD(PUBLIC_LOG_S " AcquireCopyToDstBuffer in", config_.queueName_.c_str());
232 FALSE_RETURN_V(dstBuffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
233
234 std::shared_ptr<AVBuffer> srcBuffer;
235 Status ret = AcquireBuffer(srcBuffer);
236 FALSE_RETURN_V(ret == Status::OK && srcBuffer != nullptr, ret);
237
238 ret = CopyBuffer(srcBuffer, dstBuffer);
239 if (ret != Status::OK) {
240 MEDIA_LOG_W(PUBLIC_LOG_S " AcquireCopyToDstBuffer fail ret=" PUBLIC_LOG_D32, config_.queueName_.c_str(), ret);
241 RollbackBuffer(srcBuffer);
242 return ret;
243 }
244 UpdateLastOutSamplePts(dstBuffer->pts_);
245
246 ret = ReleaseBuffer(srcBuffer);
247 MEDIA_LOG_DD(PUBLIC_LOG_S " AcquireCopyToDstBuffer out", config_.queueName_.c_str());
248 return ret;
249 }
250
CopyBuffer(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)251 Status SampleQueue::CopyBuffer(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
252 {
253 // copy basic data
254 dstBuffer->pts_ = srcBuffer->pts_;
255 dstBuffer->dts_ = srcBuffer->dts_;
256 dstBuffer->duration_ = srcBuffer->duration_;
257 dstBuffer->flag_ = srcBuffer->flag_;
258
259 CopyMeta(srcBuffer, dstBuffer);
260
261 if (IsEosFrame(dstBuffer)) {
262 MEDIA_LOG_I(PUBLIC_LOG_S " receive IsEosFrame", config_.queueName_.c_str());
263 return Status::OK;
264 }
265 return CopyAVMemory(srcBuffer, dstBuffer);
266 }
267
268
CopyMeta(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)269 void SampleQueue::CopyMeta(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
270 {
271 if (srcBuffer->meta_ == nullptr) {
272 dstBuffer->meta_ = nullptr;
273 return;
274 }
275
276 int32_t trackId = INVALID_TRACK_ID;
277 if (!dstBuffer->meta_->GetData(Tag::REGULAR_TRACK_ID, trackId)) {
278 MEDIA_LOG_DD("trackId not found");
279 }
280
281 dstBuffer->meta_ = std::make_shared<Meta>(*(srcBuffer->meta_));
282 if (dstBuffer->meta_ == nullptr) {
283 return;
284 }
285
286 if (trackId != INVALID_TRACK_ID) {
287 dstBuffer->meta_->SetData(Tag::REGULAR_TRACK_ID, trackId);
288 }
289 }
290
CopyAVMemory(std::shared_ptr<AVBuffer> & srcBuffer,std::shared_ptr<AVBuffer> & dstBuffer)291 Status SampleQueue::CopyAVMemory(std::shared_ptr<AVBuffer>& srcBuffer, std::shared_ptr<AVBuffer>& dstBuffer)
292 {
293 std::shared_ptr<AVMemory>& srcMemory = srcBuffer->memory_;
294 std::shared_ptr<AVMemory>& dstMemory = dstBuffer->memory_;
295 if (!srcMemory || !dstMemory) {
296 return Status::ERROR_NULL_POINT_BUFFER;
297 }
298 if (srcMemory->GetSize() > dstMemory->GetCapacity()) {
299 MEDIA_LOG_E(PUBLIC_LOG_S " srcMemory->GetSize() " PUBLIC_LOG_U32 "dstMemory->GetCapacity()" PUBLIC_LOG_U32
300 " srcMemory->GetOffset()" PUBLIC_LOG_U32,
301 config_.queueName_.c_str(),
302 srcMemory->GetSize(),
303 dstMemory->GetCapacity(),
304 srcMemory->GetOffset());
305 return Status::ERROR_INVALID_BUFFER_SIZE;
306 }
307
308 errno_t copyRet = memcpy_s(dstMemory->GetAddr(),
309 dstMemory->GetCapacity(),
310 srcMemory->GetAddr() + srcMemory->GetOffset(),
311 srcMemory->GetSize());
312 if (copyRet != EOK) {
313 return Status::ERROR_UNKNOWN;
314 }
315 dstMemory->SetSize(srcMemory->GetSize());
316 dstMemory->SetOffset(srcMemory->GetOffset());
317 return Status::OK;
318 }
319
ReleaseBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)320 Status SampleQueue::ReleaseBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
321 {
322 MEDIA_LOG_DD(PUBLIC_LOG_S " ReleaseBuffer", config_.queueName_.c_str());
323 FALSE_RETURN_V(sampleBufferQueueConsumer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
324 Status status = sampleBufferQueueConsumer_->ReleaseBuffer(sampleBuffer);
325 FALSE_RETURN_V_MSG_E(
326 status == Status::OK, status, PUBLIC_LOG_S "ReleaseBuffer failed ", config_.queueName_.c_str());
327 MEDIA_LOG_DD(PUBLIC_LOG_S " bufferId: " PUBLIC_LOG_U64 ", pts: " PUBLIC_LOG_D64,
328 config_.queueName_.c_str(),
329 sampleBuffer->GetUniqueId(),
330 sampleBuffer->pts_);
331 return status;
332 }
333
RollbackBuffer(std::shared_ptr<AVBuffer> & sampleBuffer)334 Status SampleQueue::RollbackBuffer(std::shared_ptr<AVBuffer>& sampleBuffer)
335 {
336 MEDIA_LOG_DD(PUBLIC_LOG_S " RollbackBuffer", config_.queueName_.c_str());
337 rollbackBufferQueue_.push_back(sampleBuffer);
338 return Status::OK;
339 }
340
QuerySizeForNextAcquireBuffer(size_t & size)341 Status SampleQueue::QuerySizeForNextAcquireBuffer(size_t& size)
342 {
343 std::shared_ptr<AVBuffer> sampleBuffer;
344 if (!rollbackBufferQueue_.empty()) {
345 sampleBuffer = rollbackBufferQueue_.front();
346 } else {
347 Status ret = AcquireBuffer(sampleBuffer);
348 FALSE_RETURN_V_MSG_D(
349 ret == Status::OK, ret, PUBLIC_LOG_S " failed ret=" PUBLIC_LOG_D32, config_.queueName_.c_str(), ret);
350 SampleQueue::RollbackBuffer(sampleBuffer);
351 }
352 FALSE_RETURN_V(sampleBuffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
353 size = sampleBuffer->GetConfig().capacity;
354 MEDIA_LOG_DD(PUBLIC_LOG_S " QuerySizeForNextAcquireBuffer size=" PUBLIC_LOG_ZU, config_.queueName_.c_str(), size);
355 return Status::OK;
356 }
357
Clear()358 Status SampleQueue::Clear()
359 {
360 MEDIA_LOG_I(PUBLIC_LOG_S " SampleQueue Clear", config_.queueName_.c_str());
361 while (!rollbackBufferQueue_.empty()) {
362 auto sampleBuffer = rollbackBufferQueue_.front();
363 MEDIA_LOG_I(PUBLIC_LOG_S" clear rollbackBufferQueue_ bufferId: " PUBLIC_LOG_U64
364 ", pts: " PUBLIC_LOG_D64, config_.queueName_.c_str(), sampleBuffer->GetUniqueId(), sampleBuffer->pts_);
365 rollbackBufferQueue_.pop_front();
366 ReleaseBuffer(sampleBuffer);
367 }
368 if (sampleBufferQueueProducer_ != nullptr) {
369 sampleBufferQueueProducer_->Clear();
370 }
371 std::lock_guard<std::mutex> ptsLock(ptsMutex_);
372 keyFramePtsSet_.clear();
373 return Status::OK;
374 }
375
DiscardSampleAfter(int64_t startPts)376 Status SampleQueue::DiscardSampleAfter(int64_t startPts)
377 {
378 MEDIA_LOG_I(PUBLIC_LOG_S "DiscardSampleAfter startPts=" PUBLIC_LOG_D64, config_.queueName_.c_str(), startPts);
379 {
380 std::lock_guard<std::mutex> ptsLock(ptsMutex_);
381 MEDIA_LOG_I("before DiscardSampleAfter keyFramePtsSet_ =" PUBLIC_LOG_S, SetToString(keyFramePtsSet_).c_str());
382 auto it = keyFramePtsSet_.lower_bound(startPts);
383 keyFramePtsSet_.erase(it, keyFramePtsSet_.end());
384 lastEndSamplePts_ = startPts;
385 }
386 FALSE_RETURN_V(sampleBufferQueueProducer_ != nullptr, Status::ERROR_NULL_POINT_BUFFER);
387 auto isNewerSample = [startPts](const std::shared_ptr<AVBuffer>& buffer) {
388 return (buffer != nullptr) && (buffer->pts_ >= startPts);
389 };
390 return sampleBufferQueueProducer_->ClearBufferIf(isNewerSample);
391 }
392
ReadySwitchBitrate(uint32_t bitrate)393 Status SampleQueue::ReadySwitchBitrate(uint32_t bitrate)
394 {
395 MediaAVCodec::AVCodecTrace trace("SampleQueue::ReadySwitchBitrate");
396 if (!config_.isSupportBitrateSwitch_) {
397 MEDIA_LOG_W("invalid operation for ReadySwitchBitrate=" PUBLIC_LOG_U32, bitrate);
398 return Status::ERROR_INVALID_OPERATION;
399 }
400 std::lock_guard<std::mutex> statusLock(statusMutex_);
401 if (switchStatus_ == SelectBitrateStatus::NORMAL) {
402 nextSwitchBitrate_ = bitrate;
403 switchStatus_ = SelectBitrateStatus::READY_SWITCH;
404 if (IsSwitchBitrateOK()) {
405 return NotifySwitchBitrateOK();
406 }
407 } else if (switchStatus_ == SelectBitrateStatus::READY_SWITCH) {
408 // replace the old bitrate before SWITCHING
409 MEDIA_LOG_W("replace new request bitrate from " PUBLIC_LOG_U32 " to"
410 PUBLIC_LOG_U32, nextSwitchBitrate_, bitrate);
411 nextSwitchBitrate_ = bitrate;
412 } else if (switchStatus_ == SelectBitrateStatus::SWITCHING) {
413 // incomming new bitrate just put switchBitrateWaitList_ when switching
414 std::lock_guard<std::mutex> lockList(waitListMutex_);
415 // drop the oldest bitrate in switchBitrateWaitList_
416 if (switchBitrateWaitList_.size() >= MAX_BITRATE_SWITCH_WAIT_NUMBER) {
417 uint32_t oldestBitrate = switchBitrateWaitList_.front();
418 switchBitrateWaitList_.pop_front();
419 MEDIA_LOG_I("switchBitrateWaitList_ remove oldestBitrate: " PUBLIC_LOG_U32, oldestBitrate);
420 }
421 MEDIA_LOG_I("switchBitrateWaitList_ add new bitrate: " PUBLIC_LOG_U32, bitrate);
422 switchBitrateWaitList_.push_back(bitrate);
423 }
424 return Status::OK;
425 }
426
NotifySwitchBitrateOK()427 Status SampleQueue::NotifySwitchBitrateOK()
428 {
429 {
430 auto sampleQueueCb = sampleQueueCb_.lock();
431 FALSE_RETURN_V(sampleQueueCb != nullptr, Status::ERROR_NULL_POINT_BUFFER);
432 sampleQueueCb->OnSelectBitrateOk(startPtsToSwitch_, nextSwitchBitrate_);
433 }
434 switchStatus_ = SelectBitrateStatus::SWITCHING;
435 MEDIA_LOG_I("SelectBitrateStatus::SWITCHING for startPtsToSwitch_=" PUBLIC_LOG_D64 ",nextSwitchBitrate_="
436 PUBLIC_LOG_U32, startPtsToSwitch_, nextSwitchBitrate_);
437 return Status::OK;
438 }
439
UpdateLastEndSamplePts(int64_t lastEndSamplePts)440 Status SampleQueue::UpdateLastEndSamplePts(int64_t lastEndSamplePts)
441 {
442 MEDIA_LOG_DD("UpdateLastEndSamplePts lastEndSamplePts=" PUBLIC_LOG_D64, lastEndSamplePts);
443 lastEndSamplePts_ = lastEndSamplePts;
444 return Status::OK;
445 }
446
UpdateLastOutSamplePts(int64_t lastOutSamplePts)447 Status SampleQueue::UpdateLastOutSamplePts(int64_t lastOutSamplePts)
448 {
449 MEDIA_LOG_DD("UpdateLastOutSamplePts lastOutSamplePts=" PUBLIC_LOG_D64, lastOutSamplePts);
450 lastOutSamplePts_ = lastOutSamplePts;
451 return Status::OK;
452 }
453
ResponseForSwitchDone(int64_t startPtsOnSwitch)454 Status SampleQueue::ResponseForSwitchDone(int64_t startPtsOnSwitch)
455 {
456 MEDIA_LOG_I(PUBLIC_LOG_S " ResponseForSwitchDone startPtsOnSwitch=" PUBLIC_LOG_D64,
457 config_.queueName_.c_str(),
458 startPtsOnSwitch);
459
460 Status ret = DiscardSampleAfter(startPtsOnSwitch);
461 FALSE_RETURN_V_NOLOG(ret == Status::OK, ret);
462 {
463 std::lock_guard<std::mutex> statusLock(statusMutex_);
464 if (switchStatus_ == SelectBitrateStatus::SWITCHING) {
465 switchStatus_ = SelectBitrateStatus::NORMAL;
466 }
467 CheckSwitchBitrateWaitList();
468 }
469 return Status::OK;
470 }
471
CheckSwitchBitrateWaitList()472 void SampleQueue::CheckSwitchBitrateWaitList()
473 {
474 std::lock_guard<std::mutex> lockList(waitListMutex_);
475 auto it = switchBitrateWaitList_.begin();
476 while (it != switchBitrateWaitList_.end()) {
477 if (*it != nextSwitchBitrate_) {
478 nextSwitchBitrate_ = *it;
479 switchStatus_ = SelectBitrateStatus::READY_SWITCH;
480 MEDIA_LOG_I("READY_SWITCH to nextSwitchBitrate_=" PUBLIC_LOG_U32, nextSwitchBitrate_);
481 switchBitrateWaitList_.erase(it);
482 break;
483 } else {
484 it = switchBitrateWaitList_.erase(it);
485 }
486 }
487 }
488
IsSwitchBitrateOK()489 bool SampleQueue::IsSwitchBitrateOK()
490 {
491 if (switchStatus_ != SelectBitrateStatus::READY_SWITCH) {
492 return false;
493 }
494
495 if (!IsKeyFrameAvailable()) {
496 return false;
497 }
498 int64_t cacheDiff = startPtsToSwitch_ - lastEndSamplePts_;
499 MEDIA_LOG_I("IsSwitchBitrateOK cacheDiff=" PUBLIC_LOG_D64 ", startPtsToSwitch_=" PUBLIC_LOG_D64
500 ", lastEndSamplePts_=" PUBLIC_LOG_D64,
501 cacheDiff,
502 startPtsToSwitch_,
503 lastEndSamplePts_);
504 return true;
505 }
506
507
IsKeyFrameAvailable()508 bool SampleQueue::IsKeyFrameAvailable()
509 {
510 std::lock_guard<std::mutex> ptsLock(ptsMutex_);
511 MEDIA_LOG_I("keyFramePtsSet_ =" PUBLIC_LOG_S, SetToString(keyFramePtsSet_).c_str());
512 auto it = keyFramePtsSet_.lower_bound(lastEndSamplePts_ + MIN_SWITCH_BITRATE_TIME_US);
513 if (it != keyFramePtsSet_.end()) {
514 startPtsToSwitch_ = *it;
515 MEDIA_LOG_I("ok cache MIN_SWITCH_BITRATE_TIME_US with keyframe startpts=" PUBLIC_LOG_D64, startPtsToSwitch_);
516 } else if (!keyFramePtsSet_.empty()) {
517 startPtsToSwitch_ = *(keyFramePtsSet_.rbegin());
518 MEDIA_LOG_I("ok with last keyframe startpts=" PUBLIC_LOG_D64, startPtsToSwitch_);
519 } else {
520 return false;
521 }
522 return true;
523 }
524
SetToString(std::set<int64_t> localSet)525 std::string SampleQueue::SetToString(std::set<int64_t> localSet)
526 {
527 std::stringstream ss;
528 for (auto it = localSet.begin(); it != localSet.end(); ++it) {
529 if (it != localSet.begin()) {
530 ss << " ";
531 }
532 ss << *it;
533 }
534 return ss.str();
535 }
536
IsKeyFrame(std::shared_ptr<AVBuffer> & sampleBuffer) const537 bool SampleQueue::IsKeyFrame(std::shared_ptr<AVBuffer>& sampleBuffer) const
538 {
539 return (sampleBuffer != nullptr) &&
540 (sampleBuffer->flag_ & static_cast<uint32_t>(Plugins::AVBufferFlag::SYNC_FRAME));
541 }
542
IsEosFrame(std::shared_ptr<AVBuffer> & sampleBuffer) const543 bool SampleQueue::IsEosFrame(std::shared_ptr<AVBuffer>& sampleBuffer) const
544 {
545 return (sampleBuffer != nullptr) && (sampleBuffer->flag_ & static_cast<uint32_t>(Plugins::AVBufferFlag::EOS));
546 }
547
OnBufferAvailable()548 void SampleQueue::OnBufferAvailable()
549 {
550 MEDIA_LOG_DD(PUBLIC_LOG_S " OnBufferAvailable sampleBufferQueueProducer_ size=" PUBLIC_LOG_U32,
551 config_.queueName_.c_str(),
552 sampleBufferQueueProducer_->GetQueueSize());
553 auto sampleQueueCb = sampleQueueCb_.lock();
554 if (sampleQueueCb != nullptr) {
555 MEDIA_LOG_D(PUBLIC_LOG_S " OnSampleQueueBufferAvailable ", config_.queueName_.c_str());
556 sampleQueueCb->OnSampleQueueBufferAvailable(config_.queueId_);
557 }
558 }
559
OnBufferConsumer()560 void SampleQueue::OnBufferConsumer()
561 {
562 MEDIA_LOG_DD(PUBLIC_LOG_S " OnBufferConsumer ", config_.queueName_.c_str());
563 auto sampleQueueCb = sampleQueueCb_.lock();
564 if (sampleQueueCb != nullptr) {
565 MEDIA_LOG_DD(PUBLIC_LOG_S " OnSampleQueueBufferConsume ", config_.queueName_.c_str());
566 sampleQueueCb->OnSampleQueueBufferConsume(config_.queueId_);
567 }
568 }
569
UpdateQueueId(int32_t queueId)570 void SampleQueue::UpdateQueueId(int32_t queueId)
571 {
572 MEDIA_LOG_I(PUBLIC_LOG_S " change queueId to " PUBLIC_LOG_D32, config_.queueName_.c_str(), queueId);
573 config_.queueId_ = queueId;
574 }
575
GetCacheDuration() const576 uint64_t SampleQueue::GetCacheDuration() const
577 {
578 if (lastEnterSamplePts_ == Plugins::HST_TIME_NONE || lastOutSamplePts_ == Plugins::HST_TIME_NONE) {
579 return 0;
580 }
581 int64_t diff = lastEnterSamplePts_ - lastOutSamplePts_;
582 MEDIA_LOG_DD(PUBLIC_LOG_S " lastEnterSamplePts_=" PUBLIC_LOG_D64 " lastEndSamplePts_=" PUBLIC_LOG_D64
583 " diff=" PUBLIC_LOG_D64, config_.queueName_.c_str(), lastEnterSamplePts_, lastOutSamplePts_, diff);
584 return (diff > 0) ? static_cast<uint64_t>(diff) : 0;
585 }
586
GetMemoryUsage()587 uint32_t SampleQueue::GetMemoryUsage()
588 {
589 FALSE_RETURN_V_MSG_E(sampleBufferQueue_ != nullptr, 0, "bufferQueue nullptr");
590 return sampleBufferQueue_->GetMemoryUsage();
591 }
592
StringifyMeta(std::shared_ptr<Meta> & meta)593 std::string SampleQueue::StringifyMeta(std::shared_ptr<Meta> &meta)
594 {
595 FALSE_RETURN_V(meta != nullptr, "");
596 std::stringstream dumpStream;
597 for (auto iter = meta->begin(); iter != meta->end(); ++iter) {
598 switch (meta->GetValueType(iter->first)) {
599 case AnyValueType::INT32_T:
600 dumpStream << iter->first << " = " << std::to_string(AnyCast<int32_t>(iter->second)) << " | ";
601 break;
602 case AnyValueType::UINT32_T:
603 dumpStream << iter->first << " = " << std::to_string(AnyCast<uint32_t>(iter->second)) << " | ";
604 break;
605 case AnyValueType::BOOL:
606 dumpStream << iter->first << " = " << std::to_string(AnyCast<bool>(iter->second)) << " | ";
607 break;
608 case AnyValueType::DOUBLE:
609 dumpStream << iter->first << " = " << std::to_string(AnyCast<double>(iter->second)) << " | ";
610 break;
611 case AnyValueType::INT64_T:
612 dumpStream << iter->first << " = " << std::to_string(AnyCast<int64_t>(iter->second)) << " | ";
613 break;
614 case AnyValueType::FLOAT:
615 dumpStream << iter->first << " = " << std::to_string(AnyCast<float>(iter->second)) << " | ";
616 break;
617 case AnyValueType::STRING:
618 dumpStream << iter->first << " = " << AnyCast<std::string>(iter->second) << " | ";
619 break;
620 default:
621 dumpStream << iter->first << " = " << "unknown type | ";
622 break;
623 }
624 }
625 return dumpStream.str();
626 }
627
IsEmpty()628 bool SampleQueue::IsEmpty()
629 {
630 return sampleBufferQueueConsumer_->GetFilledBufferSize() == 0;
631 }
632 } // namespace Media
633 } // namespace OHOS
634