1 /*
2 * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "avbuffer_queue_consumer_impl.h"
17 #include "avbuffer_queue_impl.h"
18 #include "avbuffer_queue_producer_impl.h"
19 #include "common/log.h"
20 #include "meta/media_types.h"
21
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "AVBufferQueue" };
24 static constexpr uint8_t LOG_LIMIT_LOW_FREQ = 64;
25 constexpr int64_t MILLISECONDS_TO_MICROSECONDS = 1000;
26 }
27
28 namespace OHOS {
29 namespace Media {
30
Create(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)31 std::shared_ptr<AVBufferQueue> AVBufferQueue::Create(
32 uint32_t size, MemoryType type, const std::string& name, bool disableAlloc)
33 {
34 MEDIA_LOG_D("AVBufferQueue::Create size = %u, type = %u, name = %s",
35 size, static_cast<uint32_t>(type), name.c_str());
36 return std::make_shared<AVBufferQueueImpl>(size, type, name, disableAlloc);
37 }
38
GetLocalProducer()39 std::shared_ptr<AVBufferQueueProducer> AVBufferQueueImpl::GetLocalProducer()
40 {
41 std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
42 std::shared_ptr<AVBufferQueueProducerImpl> producer = nullptr;
43 if (localProducer_.expired()) {
44 auto shared_this = shared_from_this();
45 FALSE_RETURN_V(shared_this != nullptr, nullptr);
46 producer = std::make_shared<AVBufferQueueProducerImpl>(shared_this);
47 localProducer_ = producer;
48 }
49
50 return localProducer_.lock();
51 }
52
GetLocalConsumer()53 std::shared_ptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetLocalConsumer()
54 {
55 std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
56 std::shared_ptr<AVBufferQueueConsumerImpl> consumer = nullptr;
57 if (localConsumer_.expired()) {
58 auto shared_this = shared_from_this();
59 FALSE_RETURN_V(shared_this != nullptr, nullptr);
60 consumer = std::make_shared<AVBufferQueueConsumerImpl>(shared_this);
61 localConsumer_ = consumer;
62 }
63 return localConsumer_.lock();
64 }
65
GetProducer()66 sptr<AVBufferQueueProducer> AVBufferQueueImpl::GetProducer()
67 {
68 std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
69 sptr<AVBufferQueueProducerImpl> producer = nullptr;
70 if (producer_ == nullptr || producer_->GetSptrRefCount() <= 0) {
71 auto shared_this = shared_from_this();
72 FALSE_RETURN_V(shared_this != nullptr, nullptr);
73 producer = new AVBufferQueueProducerImpl(shared_this);
74 producer_ = producer;
75 }
76
77 return producer_.promote();
78 }
79
GetConsumer()80 sptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetConsumer()
81 {
82 std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
83 sptr<AVBufferQueueConsumerImpl> consumer = nullptr;
84 if (consumer_ == nullptr || consumer_->GetSptrRefCount() <= 0) {
85 auto shared_this = shared_from_this();
86 FALSE_RETURN_V(shared_this != nullptr, nullptr);
87 consumer = new AVBufferQueueConsumerImpl(shared_this);
88 consumer_ = consumer;
89 }
90
91 return consumer_.promote();
92 }
93
AVBufferQueueImpl(const std::string & name)94 AVBufferQueueImpl::AVBufferQueueImpl(const std::string &name)
95 : AVBufferQueue(), name_(name), size_(0), memoryType_(MemoryType::UNKNOWN_MEMORY), disableAlloc_(false) {}
96
AVBufferQueueImpl(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)97 AVBufferQueueImpl::AVBufferQueueImpl(uint32_t size, MemoryType type, const std::string &name, bool disableAlloc)
98 : AVBufferQueue(), name_(name), size_(size), memoryType_(type), disableAlloc_(disableAlloc)
99 {
100 if (size_ > AVBUFFER_QUEUE_MAX_QUEUE_SIZE) {
101 size_ = AVBUFFER_QUEUE_MAX_QUEUE_SIZE;
102 }
103 }
104
GetQueueSize()105 uint32_t AVBufferQueueImpl::GetQueueSize()
106 {
107 return size_;
108 }
109
SetQueueSize(uint32_t size)110 Status AVBufferQueueImpl::SetQueueSize(uint32_t size)
111 {
112 FALSE_RETURN_V(size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_,
113 Status::ERROR_INVALID_BUFFER_SIZE);
114
115 return SetLargerQueueSize(size);
116 }
117
SetLargerQueueSize(uint32_t size)118 Status AVBufferQueueImpl::SetLargerQueueSize(uint32_t size)
119 {
120 FALSE_RETURN_V(size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE_FOR_LARGER && size != size_,
121 Status::ERROR_INVALID_BUFFER_SIZE);
122
123 if (size > size_) {
124 size_ = size;
125 if (!disableAlloc_) {
126 requestCondition.notify_all();
127 }
128 } else {
129 std::lock_guard<std::mutex> lockGuard(queueMutex_);
130 DeleteBuffers(size_ - size);
131 size_ = size;
132 }
133
134 return Status::OK;
135 }
136
ClearBufferIf(std::function<bool (const std::shared_ptr<AVBuffer> &)> pred)137 Status AVBufferQueueImpl::ClearBufferIf(std::function<bool(const std::shared_ptr<AVBuffer>&)> pred)
138 {
139 std::lock_guard<std::mutex> lockGuard(queueMutex_);
140 for (auto dirtyIt = dirtyBufferList_.begin(); dirtyIt != dirtyBufferList_.end();) {
141 uint64_t uniqueId = *dirtyIt;
142 auto cacheIt = cachedBufferMap_.find(uniqueId);
143 if (cacheIt == cachedBufferMap_.end()) {
144 MEDIA_LOG_E("unexpected buffer uniqueId=" PUBLIC_LOG_U64, uniqueId);
145 ++dirtyIt;
146 continue;
147 }
148 if (cacheIt->second.state != AVBUFFER_STATE_PUSHED && cacheIt->second.state != AVBUFFER_STATE_RETURNED) {
149 MEDIA_LOG_I("ignore unexpected buffer status uniqueId=" PUBLIC_LOG_U64 ",state= " PUBLIC_LOG_D32,
150 uniqueId,
151 static_cast<int32_t>(cacheIt->second.state));
152 ++dirtyIt;
153 continue;
154 }
155
156 if (pred(cacheIt->second.buffer)) {
157 MEDIA_LOG_D("ClearBufferIf pred ok uniqueId=" PUBLIC_LOG_U64 ",pts=" PUBLIC_LOG_D64,
158 uniqueId,
159 cacheIt->second.buffer->pts_);
160 cacheIt->second.state = AVBUFFER_STATE_RELEASED;
161 InsertFreeBufferInOrder(uniqueId);
162 dirtyIt = dirtyBufferList_.erase(dirtyIt);
163 } else {
164 ++dirtyIt;
165 }
166 }
167 requestCondition.notify_all();
168 return Status::OK;
169 }
170
IsBufferInQueue(const std::shared_ptr<AVBuffer> & buffer)171 bool AVBufferQueueImpl::IsBufferInQueue(const std::shared_ptr<AVBuffer>& buffer)
172 {
173 FALSE_RETURN_V(buffer != nullptr, false);
174 auto uniqueId = buffer->GetUniqueId();
175 return cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end();
176 }
177
GetCachedBufferCount() const178 uint32_t AVBufferQueueImpl::GetCachedBufferCount() const
179 {
180 // 确保cachedBufferMap_.size()不会超过MAX_UINT32
181 return static_cast<uint32_t>(cachedBufferMap_.size());
182 }
183
PopFromFreeBufferList(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)184 Status AVBufferQueueImpl::PopFromFreeBufferList(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
185 {
186 for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
187 if (config <= cachedBufferMap_[*it].config) {
188 buffer = cachedBufferMap_[*it].buffer;
189 freeBufferList_.erase(it);
190 return Status::OK;
191 }
192 }
193
194 if (freeBufferList_.empty()) {
195 buffer = nullptr;
196 // 没有可以重用的freeBuffer
197 return Status::ERROR_NO_FREE_BUFFER;
198 }
199
200 buffer = cachedBufferMap_[freeBufferList_.front()].buffer;
201 freeBufferList_.pop_front();
202
203 return Status::OK;
204 }
205
PopFromDirtyBufferList(std::shared_ptr<AVBuffer> & buffer)206 Status AVBufferQueueImpl::PopFromDirtyBufferList(std::shared_ptr<AVBuffer>& buffer)
207 {
208 FALSE_RETURN_V_NOLOG(!dirtyBufferList_.empty(), Status::ERROR_NO_DIRTY_BUFFER);
209
210 buffer = cachedBufferMap_[dirtyBufferList_.front()].buffer;
211 dirtyBufferList_.pop_front();
212 return Status::OK;
213 }
214
AllocBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)215 Status AVBufferQueueImpl::AllocBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
216 {
217 auto bufferImpl = AVBuffer::CreateAVBuffer(config);
218 FALSE_RETURN_V(bufferImpl != nullptr, Status::ERROR_CREATE_BUFFER);
219
220 auto uniqueId = bufferImpl->GetUniqueId();
221 AVBufferElement ele = {
222 .config = bufferImpl->GetConfig(),
223 .state = AVBUFFER_STATE_RELEASED,
224 .isDeleting = false,
225 .buffer = bufferImpl,
226 };
227 cachedBufferMap_[uniqueId] = ele;
228 buffer = bufferImpl;
229 TotalMemoryCalculation(true, ele.config.capacity);
230
231 return Status::OK;
232 }
233
RequestReuseBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)234 Status AVBufferQueueImpl::RequestReuseBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
235 {
236 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
237
238 auto uniqueId = buffer->GetUniqueId();
239 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_CREATE_BUFFER);
240
241 if (config <= cachedBufferMap_[uniqueId].config) {
242 // 不需要重新分配,直接更新buffer大小
243 cachedBufferMap_[uniqueId].config.size = config.size;
244 } else {
245 // 重新分配
246 DeleteCachedBufferById(uniqueId);
247 NOK_RETURN(AllocBuffer(buffer, config));
248 }
249
250 // 注意这里的uniqueId可能因为重新分配buffer而更新,所以需要再次获取
251 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
252 return Status::OK;
253 }
254
DeleteBuffers(uint32_t count)255 void AVBufferQueueImpl::DeleteBuffers(uint32_t count)
256 {
257 FALSE_RETURN(count > 0);
258
259 while (!freeBufferList_.empty()) {
260 DeleteCachedBufferById(freeBufferList_.front());
261 freeBufferList_.pop_front();
262 count--;
263 if (count <= 0) {
264 return;
265 }
266 }
267
268 while (!dirtyBufferList_.empty()) {
269 DeleteCachedBufferById(dirtyBufferList_.front());
270 dirtyBufferList_.pop_front();
271 count--;
272 if (count <= 0) {
273 return;
274 }
275 }
276
277 for (auto&& ele : cachedBufferMap_) {
278 ele.second.isDeleting = true;
279 // we don't have to do anything
280 count--;
281 if (count <= 0) {
282 break;
283 }
284 }
285 }
286
DeleteCachedBufferById(uint64_t uniqueId)287 void AVBufferQueueImpl::DeleteCachedBufferById(uint64_t uniqueId)
288 {
289 auto it = cachedBufferMap_.find(uniqueId);
290 if (it != cachedBufferMap_.end()) {
291 MEDIA_LOG_D("DeleteCachedBufferById uniqueId:%llu, state:%d", uniqueId, it->second.state);
292 TotalMemoryCalculation(false, it->second.config.capacity);
293 cachedBufferMap_.erase(it);
294 }
295 }
296
CheckConfig(const AVBufferConfig & config)297 Status AVBufferQueueImpl::CheckConfig(const AVBufferConfig& config)
298 {
299 if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
300 MEDIA_LOG_D("config.memoryType != MemoryType::UNKNOWN_MEMORY");
301 return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
302 }
303 // memoryType_初始化之后将无法改变。
304 if (memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_) {
305 MEDIA_LOG_D("memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_");
306 return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
307 }
308 memoryType_ = config.memoryType;
309 return Status::OK;
310 }
311
wait_for(std::unique_lock<std::mutex> & lock,int64_t timeoutUs)312 bool AVBufferQueueImpl::wait_for(std::unique_lock<std::mutex>& lock, int64_t timeoutUs)
313 {
314 MEDIA_LOG_D("wait for free buffer, timeout = %{public}" PRId64, timeoutUs);
315 if (timeoutUs > 0) {
316 return requestCondition.wait_for(
317 lock, std::chrono::microseconds(timeoutUs), [this]() {
318 return !freeBufferList_.empty() || (GetCachedBufferCount() < GetQueueSize());
319 });
320 } else if (timeoutUs < 0) {
321 requestCondition.wait(lock);
322 }
323 return true;
324 }
325
RequestBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config,int32_t timeoutMs)326 Status AVBufferQueueImpl::RequestBuffer(
327 std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config, int32_t timeoutMs)
328 {
329 return RequestBufferWaitUs(buffer, config, static_cast<int64_t>(timeoutMs) * MILLISECONDS_TO_MICROSECONDS);
330 }
331
RequestBufferWaitUs(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config,int64_t timeoutUs)332 Status AVBufferQueueImpl::RequestBufferWaitUs(
333 std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config, int64_t timeoutUs)
334 {
335 auto configCopy = config;
336 if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
337 MEDIA_LOG_D("AVBufferQueueImpl::RequestBuffer config.memoryType unknown, "
338 "memoryType_ = %u", static_cast<uint32_t>(memoryType_));
339 configCopy.memoryType = memoryType_;
340 }
341
342 // check param
343 std::unique_lock<std::mutex> lock(queueMutex_);
344 auto res = CheckConfig(configCopy);
345 FALSE_RETURN_V_MSG(res == Status::OK,
346 res, "CheckConfig not OK, code %{public}d", static_cast<int32_t>(res));
347 // dequeue from free list
348 auto ret = PopFromFreeBufferList(buffer, configCopy);
349 if (ret == Status::OK) {
350 return RequestReuseBuffer(buffer, configCopy);
351 }
352
353 // check queue size
354 if (GetCachedBufferCount() >= GetQueueSize()) {
355 if (!wait_for(lock, timeoutUs)) {
356 MEDIA_LOG_D("FALSE_RETURN_V wait_for(lock, timeoutUs)");
357 return Status::ERROR_WAIT_TIMEOUT;
358 }
359 // 被条件唤醒后,再次尝试从freeBufferList中取buffer
360 ret = PopFromFreeBufferList(buffer, configCopy);
361 if (ret == Status::OK) {
362 return RequestReuseBuffer(buffer, configCopy);
363 }
364 if (GetCachedBufferCount() >= GetQueueSize()) {
365 return Status::ERROR_NO_FREE_BUFFER;
366 }
367 }
368
369 NOK_RETURN(AllocBuffer(buffer, configCopy));
370 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
371
372 return Status::OK;
373 }
374
InsertFreeBufferInOrder(uint64_t uniqueId)375 void AVBufferQueueImpl::InsertFreeBufferInOrder(uint64_t uniqueId)
376 {
377 for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
378 if ((*it != uniqueId) &&
379 (cachedBufferMap_[*it].config.capacity >= cachedBufferMap_[uniqueId].config.capacity)) {
380 freeBufferList_.insert(it, uniqueId);
381 return;
382 }
383 }
384 freeBufferList_.emplace_back(uniqueId);
385 }
386
CancelBuffer(uint64_t uniqueId)387 Status AVBufferQueueImpl::CancelBuffer(uint64_t uniqueId)
388 {
389 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
390
391 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_REQUESTED ||
392 cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
393 Status::ERROR_INVALID_BUFFER_STATE);
394
395 InsertFreeBufferInOrder(uniqueId);
396
397 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
398
399 requestCondition.notify_all();
400
401 MEDIA_LOG_D("cancel buffer id = %llu", uniqueId);
402
403 return Status::OK;
404 }
405
PushBuffer(uint64_t uniqueId,bool available)406 Status AVBufferQueueImpl::PushBuffer(uint64_t uniqueId, bool available)
407 {
408 std::shared_ptr<AVBuffer> buffer = nullptr;
409 {
410 std::lock_guard<std::mutex> lockGuard(queueMutex_);
411 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
412 Status::ERROR_INVALID_BUFFER_ID);
413
414 auto& ele = cachedBufferMap_[uniqueId];
415 if (ele.isDeleting) {
416 DeleteCachedBufferById(uniqueId);
417 MEDIA_LOG_D("delete push buffer uniqueId(%llu)", uniqueId);
418 return Status::OK;
419 }
420
421 if (available) {
422 FALSE_RETURN_V(ele.buffer->GetConfig().size >= 0, Status::ERROR_INVALID_BUFFER_SIZE);
423 }
424
425 FALSE_RETURN_V(ele.state == AVBUFFER_STATE_REQUESTED || ele.state == AVBUFFER_STATE_ATTACHED,
426 Status::ERROR_INVALID_BUFFER_STATE);
427
428 ele.state = AVBUFFER_STATE_PUSHED;
429 buffer = cachedBufferMap_[uniqueId].buffer;
430 }
431
432 if (available) {
433 std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
434 if (!brokerListeners_.empty() && brokerListeners_.back() != nullptr) {
435 brokerListeners_.back()->OnBufferFilled(buffer);
436 return Status::OK;
437 }
438 }
439
440 return ReturnBuffer(uniqueId, available);
441 }
442
PushBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)443 Status AVBufferQueueImpl::PushBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
444 {
445 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
446
447 return PushBuffer(buffer->GetUniqueId(), available);
448 }
449
ReturnBuffer(uint64_t uniqueId,bool available)450 Status __attribute__((no_sanitize("cfi"))) AVBufferQueueImpl::ReturnBuffer(uint64_t uniqueId, bool available)
451 {
452 {
453 std::lock_guard<std::mutex> lockGuard(queueMutex_);
454 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
455 Status::ERROR_INVALID_BUFFER_ID);
456
457 if (cachedBufferMap_[uniqueId].isDeleting) {
458 DeleteCachedBufferById(uniqueId);
459 MEDIA_LOG_D("delete return buffer uniqueId(%llu)", uniqueId);
460 return Status::OK;
461 }
462
463 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
464 Status::ERROR_INVALID_BUFFER_STATE);
465
466 if (!available) {
467 NOK_RETURN(CancelBuffer(uniqueId));
468 } else {
469 auto& config = cachedBufferMap_[uniqueId].buffer->GetConfig();
470 bool isEosBuffer = cachedBufferMap_[uniqueId].buffer->flag_ & (uint32_t)(Plugins::AVBufferFlag::EOS);
471 if (!isEosBuffer) {
472 FALSE_RETURN_V(config.size > 0, Status::ERROR_INVALID_BUFFER_SIZE);
473 }
474 cachedBufferMap_[uniqueId].config = config;
475 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RETURNED;
476 dirtyBufferList_.push_back(uniqueId);
477 }
478 }
479
480 if (!available) {
481 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
482 if (producerListener_ != nullptr) {
483 producerListener_->OnBufferAvailable();
484 }
485 return Status::OK;
486 }
487
488 std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
489 FALSE_RETURN_V(consumerListener_ != nullptr, Status::ERROR_NO_CONSUMER_LISTENER);
490 consumerListener_->OnBufferAvailable();
491
492 return Status::OK;
493 }
494
ReturnBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)495 Status AVBufferQueueImpl::ReturnBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
496 {
497 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
498
499 return ReturnBuffer(buffer->GetUniqueId(), available);
500 }
501
GetMemoryUsage()502 uint32_t AVBufferQueueImpl::GetMemoryUsage()
503 {
504 return memoryUsage_.load();
505 }
506
SetQueueSizeAndAttachBuffer(uint32_t size,std::shared_ptr<AVBuffer> & buffer,bool isFilled)507 Status AVBufferQueueImpl::SetQueueSizeAndAttachBuffer(uint32_t size,
508 std::shared_ptr<AVBuffer>& buffer, bool isFilled)
509 {
510 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
511 auto config = buffer->GetConfig();
512 auto uniqueId = buffer->GetUniqueId();
513 {
514 std::lock_guard<std::mutex> lockGuard(queueMutex_);
515 if (size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_) {
516 SetQueueSizeBeforeAttachBufferLocked(size);
517 }
518 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
519 Status::ERROR_INVALID_BUFFER_ID);
520 NOK_RETURN(CheckConfig(config));
521 Status result = AttachAvailableBufferLocked(buffer);
522 FALSE_RETURN_V(result == Status::OK, result);
523 }
524 if (isFilled) {
525 return PushBufferOnFilled(uniqueId, isFilled);
526 }
527 return ReleaseBuffer(uniqueId);
528 }
529
TotalMemoryCalculation(bool isAdd,int32_t capacity)530 void AVBufferQueueImpl::TotalMemoryCalculation(bool isAdd, int32_t capacity)
531 {
532 FALSE_RETURN(capacity > 0);
533 uint32_t capacityUint = static_cast<uint32_t>(capacity);
534 if (isAdd) {
535 memoryUsage_.fetch_add(capacityUint);
536 } else {
537 FALSE_RETURN(capacityUint <= memoryUsage_.load());
538 memoryUsage_.fetch_sub(capacityUint);
539 }
540 }
541
AttachAvailableBufferLocked(std::shared_ptr<AVBuffer> & buffer)542 Status AVBufferQueueImpl::AttachAvailableBufferLocked(std::shared_ptr<AVBuffer>& buffer)
543 {
544 auto config = buffer->GetConfig();
545 auto uniqueId = buffer->GetUniqueId();
546 AVBufferElement ele = {
547 .config = config,
548 .state = AVBUFFER_STATE_ATTACHED,
549 .isDeleting = false,
550 .buffer = buffer
551 };
552
553 auto cachedCount = GetCachedBufferCount();
554 auto queueSize = GetQueueSize();
555 if (cachedCount >= queueSize) {
556 auto validCount = static_cast<uint32_t>(dirtyBufferList_.size() + freeBufferList_.size());
557 auto toBeDeleteCount = cachedCount - queueSize;
558 // 这里表示有可以删除的buffer,或者
559 if (validCount > toBeDeleteCount) {
560 // 在什么场景下需要在此处删除buffer?
561 DeleteBuffers(toBeDeleteCount + 1); // 多删除一个,用于attach当前buffer
562 cachedBufferMap_[uniqueId] = ele;
563 TotalMemoryCalculation(true, ele.config.capacity);
564 MEDIA_LOG_D("uniqueId(%llu) attached with delete", uniqueId);
565 } else {
566 MEDIA_LOG_E("attach failed, out of range");
567 return Status::ERROR_OUT_OF_RANGE;
568 }
569 } else {
570 cachedBufferMap_[uniqueId] = ele;
571 TotalMemoryCalculation(true, ele.config.capacity);
572 MEDIA_LOG_D("uniqueId(%llu) attached without delete", uniqueId);
573 }
574 return Status::OK;
575 }
576
PushBufferOnFilled(uint64_t uniqueId,bool isFilled)577 Status AVBufferQueueImpl::PushBufferOnFilled(uint64_t uniqueId, bool isFilled)
578 {
579 auto ret = PushBuffer(uniqueId, isFilled);
580 if (ret != Status::OK) {
581 // PushBuffer失败,强制Detach
582 DetachBuffer(uniqueId, true);
583 }
584 return ret;
585 }
586
SetQueueSizeBeforeAttachBufferLocked(uint32_t size)587 void AVBufferQueueImpl::SetQueueSizeBeforeAttachBufferLocked(uint32_t size)
588 {
589 if (size > size_) {
590 size_ = size;
591 if (!disableAlloc_) {
592 requestCondition.notify_all();
593 }
594 } else {
595 DeleteBuffers(size_ - size);
596 size_ = size;
597 }
598 }
599
AttachBuffer(std::shared_ptr<AVBuffer> & buffer,bool isFilled)600 Status AVBufferQueueImpl::AttachBuffer(std::shared_ptr<AVBuffer>& buffer, bool isFilled)
601 {
602 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
603
604 auto config = buffer->GetConfig();
605 auto uniqueId = buffer->GetUniqueId();
606 {
607 std::lock_guard<std::mutex> lockGuard(queueMutex_);
608 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
609 Status::ERROR_INVALID_BUFFER_ID);
610
611 NOK_RETURN(CheckConfig(config));
612
613 Status result = AttachAvailableBufferLocked(buffer);
614 FALSE_RETURN_V(result == Status::OK, result);
615 }
616
617 if (isFilled) {
618 return PushBufferOnFilled(uniqueId, isFilled);
619 }
620
621 return ReleaseBuffer(uniqueId);
622 }
623
DetachBuffer(uint64_t uniqueId,bool force)624 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId, bool force)
625 {
626 FALSE_RETURN_V_NOLOG(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
627 Status::ERROR_INVALID_BUFFER_ID);
628
629 const auto& ele = cachedBufferMap_[uniqueId];
630
631 if (!force) {
632 // 只有生产者或消费者在获取到buffer后才能detach
633 if (ele.state == AVBUFFER_STATE_REQUESTED) {
634 MEDIA_LOG_D("detach buffer(%llu) on state requested", uniqueId);
635 } else if (ele.state == AVBUFFER_STATE_ACQUIRED) {
636 MEDIA_LOG_D("detach buffer(%llu) on state acquired", uniqueId);
637 } else {
638 MEDIA_LOGW_LIMIT(LOG_LIMIT_LOW_FREQ, "detach buffer(" PUBLIC_LOG_U64 ") on state "
639 PUBLIC_LOG_D32 "forbidden", uniqueId, ele.state);
640 return Status::ERROR_INVALID_BUFFER_STATE;
641 }
642 }
643 TotalMemoryCalculation(false, ele.config.capacity);
644 cachedBufferMap_.erase(uniqueId);
645
646 return Status::OK;
647 }
648
DetachBuffer(uint64_t uniqueId)649 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId)
650 {
651 std::lock_guard<std::mutex> lockGuard(queueMutex_);
652 return DetachBuffer(uniqueId, false);
653 }
654
DetachBuffer(const std::shared_ptr<AVBuffer> & buffer)655 Status AVBufferQueueImpl::DetachBuffer(const std::shared_ptr<AVBuffer>& buffer)
656 {
657 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
658
659 return DetachBuffer(buffer->GetUniqueId());
660 }
661
AcquireBuffer(std::shared_ptr<AVBuffer> & buffer)662 Status AVBufferQueueImpl::AcquireBuffer(std::shared_ptr<AVBuffer>& buffer)
663 {
664 std::lock_guard<std::mutex> lockGuard(queueMutex_);
665 auto ret = PopFromDirtyBufferList(buffer);
666 FALSE_RETURN_V_MSG_D(ret == Status::OK, ret, "acquire buffer failed");
667
668 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_ACQUIRED;
669
670 return Status::OK;
671 }
672
ReleaseBuffer(uint64_t uniqueId)673 Status AVBufferQueueImpl::ReleaseBuffer(uint64_t uniqueId)
674 {
675 {
676 std::lock_guard<std::mutex> lockGuard(queueMutex_);
677 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
678
679 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ACQUIRED ||
680 cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ATTACHED, Status::ERROR_INVALID_BUFFER_STATE);
681
682 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
683 if (cachedBufferMap_[uniqueId].isDeleting) {
684 DeleteCachedBufferById(uniqueId);
685 return Status::OK;
686 }
687
688 InsertFreeBufferInOrder(uniqueId);
689
690 requestCondition.notify_all();
691 }
692
693 // 注意:此时通知生产者有buffer可用,但实际有可能已经被request wait的生产者获取
694 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
695 if (producerListener_ != nullptr) {
696 producerListener_->OnBufferAvailable();
697 }
698
699 return Status::OK;
700 }
701
ReleaseBuffer(const std::shared_ptr<AVBuffer> & buffer)702 Status AVBufferQueueImpl::ReleaseBuffer(const std::shared_ptr<AVBuffer>& buffer)
703 {
704 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
705
706 return ReleaseBuffer(buffer->GetUniqueId());
707 }
708
Clear()709 Status AVBufferQueueImpl::Clear()
710 {
711 MEDIA_LOG_E("AVBufferQueueImpl Clear");
712 std::lock_guard<std::mutex> lockGuard(queueMutex_);
713 dirtyBufferList_.clear();
714 for (auto it = cachedBufferMap_.begin(); it != cachedBufferMap_.end(); it++) {
715 if (it->second.state == AVBUFFER_STATE_PUSHED || it->second.state == AVBUFFER_STATE_RETURNED) {
716 it->second.state = AVBUFFER_STATE_RELEASED;
717 InsertFreeBufferInOrder(it->first);
718 }
719 }
720 requestCondition.notify_all();
721 return Status::OK;
722 }
723
SetBrokerListener(sptr<IBrokerListener> & listener)724 Status AVBufferQueueImpl::SetBrokerListener(sptr<IBrokerListener>& listener)
725 {
726 std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
727 brokerListeners_.push_back(listener);
728 return Status::OK;
729 }
730
RemoveBrokerListener(sptr<IBrokerListener> & listener)731 Status AVBufferQueueImpl::RemoveBrokerListener(sptr<IBrokerListener>& listener)
732 {
733 std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
734 if (!brokerListeners_.empty() && listener == brokerListeners_.back()) {
735 brokerListeners_.pop_back();
736 MEDIA_LOG_I("RemoveBrokerListener success, size: %{public}d", brokerListeners_.size());
737 } else {
738 MEDIA_LOG_E("removed item is not the back one.");
739 }
740 return Status::OK;
741 }
742
SetProducerListener(sptr<IProducerListener> & listener)743 Status AVBufferQueueImpl::SetProducerListener(sptr<IProducerListener>& listener)
744 {
745 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
746 producerListener_ = listener;
747
748 return Status::OK;
749 }
750
SetConsumerListener(sptr<IConsumerListener> & listener)751 Status AVBufferQueueImpl::SetConsumerListener(sptr<IConsumerListener>& listener)
752 {
753 std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
754 consumerListener_ = listener;
755
756 return Status::OK;
757 }
758
GetFilledBufferSize()759 uint32_t AVBufferQueueImpl::GetFilledBufferSize()
760 {
761 std::lock_guard<std::mutex> lockGuard(queueMutex_);
762 return dirtyBufferList_.size();
763 }
764
765 } // namespace Media
766 } // namespace OHOS
767