1 /*
2 * Copyright (c) 2021-2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "avbuffer_queue_consumer_impl.h"
17 #include "avbuffer_queue_impl.h"
18 #include "avbuffer_queue_producer_impl.h"
19 #include "common/log.h"
20 #include "meta/media_types.h"
21
22 namespace {
23 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, LOG_DOMAIN_FOUNDATION, "AVBufferQueue" };
24 }
25
26 namespace OHOS {
27 namespace Media {
28
Create(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)29 std::shared_ptr<AVBufferQueue> AVBufferQueue::Create(
30 uint32_t size, MemoryType type, const std::string& name, bool disableAlloc)
31 {
32 MEDIA_LOG_D("AVBufferQueue::Create size = %u, type = %u, name = %s",
33 size, static_cast<uint32_t>(type), name.c_str());
34 return std::make_shared<AVBufferQueueImpl>(size, type, name, disableAlloc);
35 }
36
GetLocalProducer()37 std::shared_ptr<AVBufferQueueProducer> AVBufferQueueImpl::GetLocalProducer()
38 {
39 std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
40 std::shared_ptr<AVBufferQueueProducerImpl> producer = nullptr;
41 if (localProducer_.expired()) {
42 auto shared_this = shared_from_this();
43 FALSE_RETURN_V(shared_this != nullptr, nullptr);
44 producer = std::make_shared<AVBufferQueueProducerImpl>(shared_this);
45 localProducer_ = producer;
46 }
47
48 return localProducer_.lock();
49 }
50
GetLocalConsumer()51 std::shared_ptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetLocalConsumer()
52 {
53 std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
54 std::shared_ptr<AVBufferQueueConsumerImpl> consumer = nullptr;
55 if (localConsumer_.expired()) {
56 auto shared_this = shared_from_this();
57 FALSE_RETURN_V(shared_this != nullptr, nullptr);
58 consumer = std::make_shared<AVBufferQueueConsumerImpl>(shared_this);
59 localConsumer_ = consumer;
60 }
61 return localConsumer_.lock();
62 }
63
GetProducer()64 sptr<AVBufferQueueProducer> AVBufferQueueImpl::GetProducer()
65 {
66 std::lock_guard<std::mutex> lockGuard(producerCreatorMutex_);
67 sptr<AVBufferQueueProducerImpl> producer = nullptr;
68 if (producer_ == nullptr || producer_->GetSptrRefCount() <= 0) {
69 auto shared_this = shared_from_this();
70 FALSE_RETURN_V(shared_this != nullptr, nullptr);
71 producer = new AVBufferQueueProducerImpl(shared_this);
72 producer_ = producer;
73 }
74
75 return producer_.promote();
76 }
77
GetConsumer()78 sptr<AVBufferQueueConsumer> AVBufferQueueImpl::GetConsumer()
79 {
80 std::lock_guard<std::mutex> lockGuard(consumerCreatorMutex_);
81 sptr<AVBufferQueueConsumerImpl> consumer = nullptr;
82 if (consumer_ == nullptr || consumer_->GetSptrRefCount() <= 0) {
83 auto shared_this = shared_from_this();
84 FALSE_RETURN_V(shared_this != nullptr, nullptr);
85 consumer = new AVBufferQueueConsumerImpl(shared_this);
86 consumer_ = consumer;
87 }
88
89 return consumer_.promote();
90 }
91
AVBufferQueueImpl(const std::string & name)92 AVBufferQueueImpl::AVBufferQueueImpl(const std::string &name)
93 : AVBufferQueue(), name_(name), size_(0), memoryType_(MemoryType::UNKNOWN_MEMORY), disableAlloc_(false) {}
94
AVBufferQueueImpl(uint32_t size,MemoryType type,const std::string & name,bool disableAlloc)95 AVBufferQueueImpl::AVBufferQueueImpl(uint32_t size, MemoryType type, const std::string &name, bool disableAlloc)
96 : AVBufferQueue(), name_(name), size_(size), memoryType_(type), disableAlloc_(disableAlloc)
97 {
98 if (size_ > AVBUFFER_QUEUE_MAX_QUEUE_SIZE) {
99 size_ = AVBUFFER_QUEUE_MAX_QUEUE_SIZE;
100 }
101 }
102
GetQueueSize()103 uint32_t AVBufferQueueImpl::GetQueueSize()
104 {
105 return size_;
106 }
107
SetQueueSize(uint32_t size)108 Status AVBufferQueueImpl::SetQueueSize(uint32_t size)
109 {
110 FALSE_RETURN_V(size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_,
111 Status::ERROR_INVALID_BUFFER_SIZE);
112
113 return SetLargerQueueSize(size);
114 }
115
SetLargerQueueSize(uint32_t size)116 Status AVBufferQueueImpl::SetLargerQueueSize(uint32_t size)
117 {
118 FALSE_RETURN_V(size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE_FOR_LARGER && size != size_,
119 Status::ERROR_INVALID_BUFFER_SIZE);
120
121 if (size > size_) {
122 size_ = size;
123 if (!disableAlloc_) {
124 requestCondition.notify_all();
125 }
126 } else {
127 std::lock_guard<std::mutex> lockGuard(queueMutex_);
128 DeleteBuffers(size_ - size);
129 size_ = size;
130 }
131
132 return Status::OK;
133 }
134
ClearBufferIf(std::function<bool (const std::shared_ptr<AVBuffer> &)> pred)135 Status AVBufferQueueImpl::ClearBufferIf(std::function<bool(const std::shared_ptr<AVBuffer>&)> pred)
136 {
137 std::lock_guard<std::mutex> lockGuard(queueMutex_);
138 for (auto dirtyIt = dirtyBufferList_.begin(); dirtyIt != dirtyBufferList_.end();) {
139 uint64_t uniqueId = *dirtyIt;
140 auto cacheIt = cachedBufferMap_.find(uniqueId);
141 if (cacheIt == cachedBufferMap_.end()) {
142 MEDIA_LOG_E("unexpected buffer uniqueId=" PUBLIC_LOG_U64, uniqueId);
143 ++dirtyIt;
144 continue;
145 }
146 if (cacheIt->second.state != AVBUFFER_STATE_PUSHED && cacheIt->second.state != AVBUFFER_STATE_RETURNED) {
147 MEDIA_LOG_I("ignore unexpected buffer status uniqueId=" PUBLIC_LOG_U64 ",state= " PUBLIC_LOG_D32,
148 uniqueId,
149 static_cast<int32_t>(cacheIt->second.state));
150 ++dirtyIt;
151 continue;
152 }
153
154 if (pred(cacheIt->second.buffer)) {
155 MEDIA_LOG_D("ClearBufferIf pred ok uniqueId=" PUBLIC_LOG_U64 ",pts=" PUBLIC_LOG_D64,
156 uniqueId,
157 cacheIt->second.buffer->pts_);
158 cacheIt->second.state = AVBUFFER_STATE_RELEASED;
159 InsertFreeBufferInOrder(uniqueId);
160 dirtyIt = dirtyBufferList_.erase(dirtyIt);
161 } else {
162 ++dirtyIt;
163 }
164 }
165 requestCondition.notify_all();
166 return Status::OK;
167 }
168
IsBufferInQueue(const std::shared_ptr<AVBuffer> & buffer)169 bool AVBufferQueueImpl::IsBufferInQueue(const std::shared_ptr<AVBuffer>& buffer)
170 {
171 FALSE_RETURN_V(buffer != nullptr, false);
172 auto uniqueId = buffer->GetUniqueId();
173 return cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end();
174 }
175
GetCachedBufferCount() const176 uint32_t AVBufferQueueImpl::GetCachedBufferCount() const
177 {
178 // 确保cachedBufferMap_.size()不会超过MAX_UINT32
179 return static_cast<uint32_t>(cachedBufferMap_.size());
180 }
181
PopFromFreeBufferList(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)182 Status AVBufferQueueImpl::PopFromFreeBufferList(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
183 {
184 for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
185 if (config <= cachedBufferMap_[*it].config) {
186 buffer = cachedBufferMap_[*it].buffer;
187 freeBufferList_.erase(it);
188 return Status::OK;
189 }
190 }
191
192 if (freeBufferList_.empty()) {
193 buffer = nullptr;
194 // 没有可以重用的freeBuffer
195 return Status::ERROR_NO_FREE_BUFFER;
196 }
197
198 buffer = cachedBufferMap_[freeBufferList_.front()].buffer;
199 freeBufferList_.pop_front();
200
201 return Status::OK;
202 }
203
PopFromDirtyBufferList(std::shared_ptr<AVBuffer> & buffer)204 Status AVBufferQueueImpl::PopFromDirtyBufferList(std::shared_ptr<AVBuffer>& buffer)
205 {
206 FALSE_RETURN_V_NOLOG(!dirtyBufferList_.empty(), Status::ERROR_NO_DIRTY_BUFFER);
207
208 buffer = cachedBufferMap_[dirtyBufferList_.front()].buffer;
209 dirtyBufferList_.pop_front();
210 return Status::OK;
211 }
212
AllocBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)213 Status AVBufferQueueImpl::AllocBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
214 {
215 auto bufferImpl = AVBuffer::CreateAVBuffer(config);
216 FALSE_RETURN_V(bufferImpl != nullptr, Status::ERROR_CREATE_BUFFER);
217
218 auto uniqueId = bufferImpl->GetUniqueId();
219 AVBufferElement ele = {
220 .config = bufferImpl->GetConfig(),
221 .state = AVBUFFER_STATE_RELEASED,
222 .isDeleting = false,
223 .buffer = bufferImpl,
224 };
225 cachedBufferMap_[uniqueId] = ele;
226 buffer = bufferImpl;
227
228 return Status::OK;
229 }
230
RequestReuseBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config)231 Status AVBufferQueueImpl::RequestReuseBuffer(std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config)
232 {
233 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
234
235 auto uniqueId = buffer->GetUniqueId();
236 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_CREATE_BUFFER);
237
238 if (config <= cachedBufferMap_[uniqueId].config) {
239 // 不需要重新分配,直接更新buffer大小
240 cachedBufferMap_[uniqueId].config.size = config.size;
241 } else {
242 // 重新分配
243 DeleteCachedBufferById(uniqueId);
244 NOK_RETURN(AllocBuffer(buffer, config));
245 }
246
247 // 注意这里的uniqueId可能因为重新分配buffer而更新,所以需要再次获取
248 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
249 return Status::OK;
250 }
251
DeleteBuffers(uint32_t count)252 void AVBufferQueueImpl::DeleteBuffers(uint32_t count)
253 {
254 FALSE_RETURN(count > 0);
255
256 while (!freeBufferList_.empty()) {
257 DeleteCachedBufferById(freeBufferList_.front());
258 freeBufferList_.pop_front();
259 count--;
260 if (count <= 0) {
261 return;
262 }
263 }
264
265 while (!dirtyBufferList_.empty()) {
266 DeleteCachedBufferById(dirtyBufferList_.front());
267 dirtyBufferList_.pop_front();
268 count--;
269 if (count <= 0) {
270 return;
271 }
272 }
273
274 for (auto&& ele : cachedBufferMap_) {
275 ele.second.isDeleting = true;
276 // we don't have to do anything
277 count--;
278 if (count <= 0) {
279 break;
280 }
281 }
282 }
283
DeleteCachedBufferById(uint64_t uniqueId)284 void AVBufferQueueImpl::DeleteCachedBufferById(uint64_t uniqueId)
285 {
286 auto it = cachedBufferMap_.find(uniqueId);
287 if (it != cachedBufferMap_.end()) {
288 MEDIA_LOG_D("DeleteCachedBufferById uniqueId:%llu, state:%d", uniqueId, it->second.state);
289 cachedBufferMap_.erase(it);
290 }
291 }
292
CheckConfig(const AVBufferConfig & config)293 Status AVBufferQueueImpl::CheckConfig(const AVBufferConfig& config)
294 {
295 if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
296 MEDIA_LOG_D("config.memoryType != MemoryType::UNKNOWN_MEMORY");
297 return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
298 }
299 // memoryType_初始化之后将无法改变。
300 if (memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_) {
301 MEDIA_LOG_D("memoryType_ != MemoryType::UNKNOWN_MEMORY && config.memoryType != memoryType_");
302 return Status::ERROR_UNEXPECTED_MEMORY_TYPE;
303 }
304 memoryType_ = config.memoryType;
305 return Status::OK;
306 }
307
wait_for(std::unique_lock<std::mutex> & lock,int32_t timeoutMs)308 bool AVBufferQueueImpl::wait_for(std::unique_lock<std::mutex>& lock, int32_t timeoutMs)
309 {
310 MEDIA_LOG_D("wait for free buffer, timeout = %d", timeoutMs);
311 if (timeoutMs > 0) {
312 return requestCondition.wait_for(
313 lock, std::chrono::milliseconds(timeoutMs), [this]() {
314 return !freeBufferList_.empty() || (GetCachedBufferCount() < GetQueueSize());
315 });
316 } else if (timeoutMs < 0) {
317 requestCondition.wait(lock);
318 }
319 return true;
320 }
321
RequestBuffer(std::shared_ptr<AVBuffer> & buffer,const AVBufferConfig & config,int32_t timeoutMs)322 Status AVBufferQueueImpl::RequestBuffer(
323 std::shared_ptr<AVBuffer>& buffer, const AVBufferConfig& config, int32_t timeoutMs)
324 {
325 auto configCopy = config;
326 if (config.memoryType == MemoryType::UNKNOWN_MEMORY) {
327 MEDIA_LOG_D("AVBufferQueueImpl::RequestBuffer config.memoryType unknown, "
328 "memoryType_ = %u", static_cast<uint32_t>(memoryType_));
329 configCopy.memoryType = memoryType_;
330 }
331
332 // check param
333 std::unique_lock<std::mutex> lock(queueMutex_);
334 auto res = CheckConfig(configCopy);
335 FALSE_RETURN_V_MSG(res == Status::OK,
336 res, "CheckConfig not OK, code %{public}d", static_cast<int32_t>(res));
337 // dequeue from free list
338 auto ret = PopFromFreeBufferList(buffer, configCopy);
339 if (ret == Status::OK) {
340 return RequestReuseBuffer(buffer, configCopy);
341 }
342
343 // check queue size
344 if (GetCachedBufferCount() >= GetQueueSize()) {
345 if (!wait_for(lock, timeoutMs)) {
346 MEDIA_LOG_D("FALSE_RETURN_V wait_for(lock, timeoutMs)");
347 return Status::ERROR_WAIT_TIMEOUT;
348 }
349 // 被条件唤醒后,再次尝试从freeBufferList中取buffer
350 ret = PopFromFreeBufferList(buffer, configCopy);
351 if (ret == Status::OK) {
352 return RequestReuseBuffer(buffer, configCopy);
353 }
354 if (GetCachedBufferCount() >= GetQueueSize()) {
355 return Status::ERROR_NO_FREE_BUFFER;
356 }
357 }
358
359 NOK_RETURN(AllocBuffer(buffer, configCopy));
360 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_REQUESTED;
361
362 return Status::OK;
363 }
364
InsertFreeBufferInOrder(uint64_t uniqueId)365 void AVBufferQueueImpl::InsertFreeBufferInOrder(uint64_t uniqueId)
366 {
367 for (auto it = freeBufferList_.begin(); it != freeBufferList_.end(); it++) {
368 if ((*it != uniqueId) &&
369 (cachedBufferMap_[*it].config.capacity >= cachedBufferMap_[uniqueId].config.capacity)) {
370 freeBufferList_.insert(it, uniqueId);
371 return;
372 }
373 }
374 freeBufferList_.emplace_back(uniqueId);
375 }
376
CancelBuffer(uint64_t uniqueId)377 Status AVBufferQueueImpl::CancelBuffer(uint64_t uniqueId)
378 {
379 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
380
381 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_REQUESTED ||
382 cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
383 Status::ERROR_INVALID_BUFFER_STATE);
384
385 InsertFreeBufferInOrder(uniqueId);
386
387 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
388
389 requestCondition.notify_all();
390
391 MEDIA_LOG_D("cancel buffer id = %llu", uniqueId);
392
393 return Status::OK;
394 }
395
PushBuffer(uint64_t uniqueId,bool available)396 Status AVBufferQueueImpl::PushBuffer(uint64_t uniqueId, bool available)
397 {
398 std::shared_ptr<AVBuffer> buffer = nullptr;
399 {
400 std::lock_guard<std::mutex> lockGuard(queueMutex_);
401 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
402 Status::ERROR_INVALID_BUFFER_ID);
403
404 auto& ele = cachedBufferMap_[uniqueId];
405 if (ele.isDeleting) {
406 DeleteCachedBufferById(uniqueId);
407 MEDIA_LOG_D("delete push buffer uniqueId(%llu)", uniqueId);
408 return Status::OK;
409 }
410
411 if (available) {
412 FALSE_RETURN_V(ele.buffer->GetConfig().size >= 0, Status::ERROR_INVALID_BUFFER_SIZE);
413 }
414
415 FALSE_RETURN_V(ele.state == AVBUFFER_STATE_REQUESTED || ele.state == AVBUFFER_STATE_ATTACHED,
416 Status::ERROR_INVALID_BUFFER_STATE);
417
418 ele.state = AVBUFFER_STATE_PUSHED;
419 buffer = cachedBufferMap_[uniqueId].buffer;
420 }
421
422 if (available) {
423 std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
424 if (!brokerListeners_.empty() && brokerListeners_.back() != nullptr) {
425 brokerListeners_.back()->OnBufferFilled(buffer);
426 return Status::OK;
427 }
428 }
429
430 return ReturnBuffer(uniqueId, available);
431 }
432
PushBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)433 Status AVBufferQueueImpl::PushBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
434 {
435 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
436
437 return PushBuffer(buffer->GetUniqueId(), available);
438 }
439
ReturnBuffer(uint64_t uniqueId,bool available)440 Status __attribute__((no_sanitize("cfi"))) AVBufferQueueImpl::ReturnBuffer(uint64_t uniqueId, bool available)
441 {
442 {
443 std::lock_guard<std::mutex> lockGuard(queueMutex_);
444 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(),
445 Status::ERROR_INVALID_BUFFER_ID);
446
447 if (cachedBufferMap_[uniqueId].isDeleting) {
448 DeleteCachedBufferById(uniqueId);
449 MEDIA_LOG_D("delete return buffer uniqueId(%llu)", uniqueId);
450 return Status::OK;
451 }
452
453 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_PUSHED,
454 Status::ERROR_INVALID_BUFFER_STATE);
455
456 if (!available) {
457 NOK_RETURN(CancelBuffer(uniqueId));
458 } else {
459 auto& config = cachedBufferMap_[uniqueId].buffer->GetConfig();
460 bool isEosBuffer = cachedBufferMap_[uniqueId].buffer->flag_ & (uint32_t)(Plugins::AVBufferFlag::EOS);
461 if (!isEosBuffer) {
462 FALSE_RETURN_V(config.size > 0, Status::ERROR_INVALID_BUFFER_SIZE);
463 }
464 cachedBufferMap_[uniqueId].config = config;
465 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RETURNED;
466 dirtyBufferList_.push_back(uniqueId);
467 }
468 }
469
470 if (!available) {
471 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
472 if (producerListener_ != nullptr) {
473 producerListener_->OnBufferAvailable();
474 }
475 return Status::OK;
476 }
477
478 std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
479 FALSE_RETURN_V(consumerListener_ != nullptr, Status::ERROR_NO_CONSUMER_LISTENER);
480 consumerListener_->OnBufferAvailable();
481
482 return Status::OK;
483 }
484
ReturnBuffer(const std::shared_ptr<AVBuffer> & buffer,bool available)485 Status AVBufferQueueImpl::ReturnBuffer(const std::shared_ptr<AVBuffer>& buffer, bool available)
486 {
487 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
488
489 return ReturnBuffer(buffer->GetUniqueId(), available);
490 }
491
SetQueueSizeAndAttachBuffer(uint32_t size,std::shared_ptr<AVBuffer> & buffer,bool isFilled)492 Status AVBufferQueueImpl::SetQueueSizeAndAttachBuffer(uint32_t size,
493 std::shared_ptr<AVBuffer>& buffer, bool isFilled)
494 {
495 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
496 auto config = buffer->GetConfig();
497 auto uniqueId = buffer->GetUniqueId();
498 {
499 std::lock_guard<std::mutex> lockGuard(queueMutex_);
500 if (size >= 0 && size <= AVBUFFER_QUEUE_MAX_QUEUE_SIZE && size != size_) {
501 SetQueueSizeBeforeAttachBufferLocked(size);
502 }
503 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
504 Status::ERROR_INVALID_BUFFER_ID);
505 NOK_RETURN(CheckConfig(config));
506 Status result = AttachAvailableBufferLocked(buffer);
507 FALSE_RETURN_V(result == Status::OK, result);
508 }
509 if (isFilled) {
510 return PushBufferOnFilled(uniqueId, isFilled);
511 }
512 return ReleaseBuffer(uniqueId);
513 }
514
AttachAvailableBufferLocked(std::shared_ptr<AVBuffer> & buffer)515 Status AVBufferQueueImpl::AttachAvailableBufferLocked(std::shared_ptr<AVBuffer>& buffer)
516 {
517 auto config = buffer->GetConfig();
518 auto uniqueId = buffer->GetUniqueId();
519 AVBufferElement ele = {
520 .config = config,
521 .state = AVBUFFER_STATE_ATTACHED,
522 .isDeleting = false,
523 .buffer = buffer
524 };
525
526 auto cachedCount = GetCachedBufferCount();
527 auto queueSize = GetQueueSize();
528 if (cachedCount >= queueSize) {
529 auto validCount = static_cast<uint32_t>(dirtyBufferList_.size() + freeBufferList_.size());
530 auto toBeDeleteCount = cachedCount - queueSize;
531 // 这里表示有可以删除的buffer,或者
532 if (validCount > toBeDeleteCount) {
533 // 在什么场景下需要在此处删除buffer?
534 DeleteBuffers(toBeDeleteCount + 1); // 多删除一个,用于attach当前buffer
535 cachedBufferMap_[uniqueId] = ele;
536 MEDIA_LOG_D("uniqueId(%llu) attached with delete", uniqueId);
537 } else {
538 MEDIA_LOG_E("attach failed, out of range");
539 return Status::ERROR_OUT_OF_RANGE;
540 }
541 } else {
542 cachedBufferMap_[uniqueId] = ele;
543 MEDIA_LOG_D("uniqueId(%llu) attached without delete", uniqueId);
544 }
545 return Status::OK;
546 }
547
PushBufferOnFilled(uint64_t uniqueId,bool isFilled)548 Status AVBufferQueueImpl::PushBufferOnFilled(uint64_t uniqueId, bool isFilled)
549 {
550 auto ret = PushBuffer(uniqueId, isFilled);
551 if (ret != Status::OK) {
552 // PushBuffer失败,强制Detach
553 DetachBuffer(uniqueId, true);
554 }
555 return ret;
556 }
557
SetQueueSizeBeforeAttachBufferLocked(uint32_t size)558 void AVBufferQueueImpl::SetQueueSizeBeforeAttachBufferLocked(uint32_t size)
559 {
560 if (size > size_) {
561 size_ = size;
562 if (!disableAlloc_) {
563 requestCondition.notify_all();
564 }
565 } else {
566 DeleteBuffers(size_ - size);
567 size_ = size;
568 }
569 }
570
AttachBuffer(std::shared_ptr<AVBuffer> & buffer,bool isFilled)571 Status AVBufferQueueImpl::AttachBuffer(std::shared_ptr<AVBuffer>& buffer, bool isFilled)
572 {
573 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
574
575 auto config = buffer->GetConfig();
576 auto uniqueId = buffer->GetUniqueId();
577 {
578 std::lock_guard<std::mutex> lockGuard(queueMutex_);
579 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) == cachedBufferMap_.end(),
580 Status::ERROR_INVALID_BUFFER_ID);
581
582 NOK_RETURN(CheckConfig(config));
583
584 Status result = AttachAvailableBufferLocked(buffer);
585 FALSE_RETURN_V(result == Status::OK, result);
586 }
587
588 if (isFilled) {
589 return PushBufferOnFilled(uniqueId, isFilled);
590 }
591
592 return ReleaseBuffer(uniqueId);
593 }
594
DetachBuffer(uint64_t uniqueId,bool force)595 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId, bool force)
596 {
597 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
598
599 const auto& ele = cachedBufferMap_[uniqueId];
600
601 if (!force) {
602 // 只有生产者或消费者在获取到buffer后才能detach
603 if (ele.state == AVBUFFER_STATE_REQUESTED) {
604 MEDIA_LOG_D("detach buffer(%llu) on state requested", uniqueId);
605 } else if (ele.state == AVBUFFER_STATE_ACQUIRED) {
606 MEDIA_LOG_D("detach buffer(%llu) on state acquired", uniqueId);
607 } else {
608 MEDIA_LOG_W("detach buffer(%llu) on state %d forbidden", uniqueId, ele.state);
609 return Status::ERROR_INVALID_BUFFER_STATE;
610 }
611 }
612
613 cachedBufferMap_.erase(uniqueId);
614
615 return Status::OK;
616 }
617
DetachBuffer(uint64_t uniqueId)618 Status AVBufferQueueImpl::DetachBuffer(uint64_t uniqueId)
619 {
620 std::lock_guard<std::mutex> lockGuard(queueMutex_);
621 return DetachBuffer(uniqueId, false);
622 }
623
DetachBuffer(const std::shared_ptr<AVBuffer> & buffer)624 Status AVBufferQueueImpl::DetachBuffer(const std::shared_ptr<AVBuffer>& buffer)
625 {
626 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
627
628 return DetachBuffer(buffer->GetUniqueId());
629 }
630
AcquireBuffer(std::shared_ptr<AVBuffer> & buffer)631 Status AVBufferQueueImpl::AcquireBuffer(std::shared_ptr<AVBuffer>& buffer)
632 {
633 std::lock_guard<std::mutex> lockGuard(queueMutex_);
634 auto ret = PopFromDirtyBufferList(buffer);
635 if (ret != Status::OK) {
636 MEDIA_LOG_D("acquire buffer failed");
637 return ret;
638 }
639
640 cachedBufferMap_[buffer->GetUniqueId()].state = AVBUFFER_STATE_ACQUIRED;
641
642 return Status::OK;
643 }
644
ReleaseBuffer(uint64_t uniqueId)645 Status AVBufferQueueImpl::ReleaseBuffer(uint64_t uniqueId)
646 {
647 {
648 std::lock_guard<std::mutex> lockGuard(queueMutex_);
649 FALSE_RETURN_V(cachedBufferMap_.find(uniqueId) != cachedBufferMap_.end(), Status::ERROR_INVALID_BUFFER_ID);
650
651 FALSE_RETURN_V(cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ACQUIRED ||
652 cachedBufferMap_[uniqueId].state == AVBUFFER_STATE_ATTACHED, Status::ERROR_INVALID_BUFFER_STATE);
653
654 cachedBufferMap_[uniqueId].state = AVBUFFER_STATE_RELEASED;
655 if (cachedBufferMap_[uniqueId].isDeleting) {
656 DeleteCachedBufferById(uniqueId);
657 return Status::OK;
658 }
659
660 InsertFreeBufferInOrder(uniqueId);
661
662 requestCondition.notify_all();
663 }
664
665 // 注意:此时通知生产者有buffer可用,但实际有可能已经被request wait的生产者获取
666 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
667 if (producerListener_ != nullptr) {
668 producerListener_->OnBufferAvailable();
669 }
670
671 return Status::OK;
672 }
673
ReleaseBuffer(const std::shared_ptr<AVBuffer> & buffer)674 Status AVBufferQueueImpl::ReleaseBuffer(const std::shared_ptr<AVBuffer>& buffer)
675 {
676 FALSE_RETURN_V(buffer != nullptr, Status::ERROR_NULL_POINT_BUFFER);
677
678 return ReleaseBuffer(buffer->GetUniqueId());
679 }
680
Clear()681 Status AVBufferQueueImpl::Clear()
682 {
683 MEDIA_LOG_E("AVBufferQueueImpl Clear");
684 std::lock_guard<std::mutex> lockGuard(queueMutex_);
685 dirtyBufferList_.clear();
686 for (auto it = cachedBufferMap_.begin(); it != cachedBufferMap_.end(); it++) {
687 if (it->second.state == AVBUFFER_STATE_PUSHED || it->second.state == AVBUFFER_STATE_RETURNED) {
688 it->second.state = AVBUFFER_STATE_RELEASED;
689 InsertFreeBufferInOrder(it->first);
690 }
691 }
692 requestCondition.notify_all();
693 return Status::OK;
694 }
695
SetBrokerListener(sptr<IBrokerListener> & listener)696 Status AVBufferQueueImpl::SetBrokerListener(sptr<IBrokerListener>& listener)
697 {
698 std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
699 brokerListeners_.push_back(listener);
700 return Status::OK;
701 }
702
RemoveBrokerListener(sptr<IBrokerListener> & listener)703 Status AVBufferQueueImpl::RemoveBrokerListener(sptr<IBrokerListener>& listener)
704 {
705 std::lock_guard<std::mutex> lockGuard(brokerListenerMutex_);
706 if (!brokerListeners_.empty() && listener == brokerListeners_.back()) {
707 brokerListeners_.pop_back();
708 MEDIA_LOG_I("RemoveBrokerListener success, size: %{public}d", brokerListeners_.size());
709 } else {
710 MEDIA_LOG_E("removed item is not the back one.");
711 }
712 return Status::OK;
713 }
714
SetProducerListener(sptr<IProducerListener> & listener)715 Status AVBufferQueueImpl::SetProducerListener(sptr<IProducerListener>& listener)
716 {
717 std::lock_guard<std::mutex> lockGuard(producerListenerMutex_);
718 producerListener_ = listener;
719
720 return Status::OK;
721 }
722
SetConsumerListener(sptr<IConsumerListener> & listener)723 Status AVBufferQueueImpl::SetConsumerListener(sptr<IConsumerListener>& listener)
724 {
725 std::lock_guard<std::mutex> lockGuard(consumerListenerMutex_);
726 consumerListener_ = listener;
727
728 return Status::OK;
729 }
730
GetFilledBufferSize()731 uint32_t AVBufferQueueImpl::GetFilledBufferSize()
732 {
733 std::lock_guard<std::mutex> lockGuard(queueMutex_);
734 return dirtyBufferList_.size();
735 }
736
737 } // namespace Media
738 } // namespace OHOS
739